0

I use django, celery and redis to launch task asynchronously.

# tasks.py
@shared_task
def my_task():
    # Do stuff of task 1
    return True
# Somewhere in test1.py
my_task.delay()
# Few milli seconds later in test2.py
my_task.delay()

With that configuration, my_task is launched 2 times on 2 different files. So they are executed on different threads almost at the same time.

I need these 2 tasks to be executed one by one. If my_task #1 is executing and another my_task #2 is launched, I need my_task #2 to wait for the #1 to end before executing.

I don't want to use only one thread passing argument to celery celery worker --concurrency=1

Config of celery in my settings.py is basic :

# settings.py
CELERY_BROKER_URL = 'redis://localhost:6379/0'

I found many resources who talks about that subject but I don't really understand how to achieve my goal

Rom1
  • 346
  • 4
  • 17

1 Answers1

0

Solution on http://loose-bits.com/2010/10/distributed-task-locking-in-celery.html almost worked. Here is some adaptation :

import redis

REDIS_CLIENT = redis.Redis()

def only_one(function=None, key="", timeout=None):
    """Enforce only one celery task at a time."""

    def _dec(run_func):
        """Decorator."""

        def _caller(*args, **kwargs):
            """Caller."""
            ret_value = None
            have_lock = False
            lock = REDIS_CLIENT.lock(key, timeout=timeout)
            try:
                have_lock = lock.acquire(blocking=True)
                if have_lock:
                    ret_value = run_func(*args, **kwargs)
            finally:
                if have_lock:
                    lock.release()

            return ret_value

        return _caller

    return _dec(function) if function is not None else _dec
@task(name='my_app.sample.tasks.single_task')
@only_one(key="SingleTask", timeout=60 * 5)
def single_task(self, **kwargs):
    """Run task."""
    print("test")

The thing is, I don't configure Redis anywhere in my settings.py, so I don't understand how it found the right redis database. I guess it takes it from celery's config.

Rom1
  • 346
  • 4
  • 17