63

I use celery to update RSS feeds in my news aggregation site. I use one @task for each feed, and things seem to work nicely.

There's a detail that I'm not sure to handle well though: all feeds are updated once every minute with a @periodic_task, but what if a feed is still updating from the last periodic task when a new one is started ? (for example if the feed is really slow, or offline and the task is held in a retry loop)

Currently I store tasks results and check their status like this:

import socket
from datetime import timedelta
from celery.decorators import task, periodic_task
from aggregator.models import Feed


_results = {}


@periodic_task(run_every=timedelta(minutes=1))
def fetch_articles():
    for feed in Feed.objects.all():
        if feed.pk in _results:
            if not _results[feed.pk].ready():
                # The task is not finished yet
                continue
        _results[feed.pk] = update_feed.delay(feed)


@task()
def update_feed(feed):
    try:
        feed.fetch_articles()
    except socket.error, exc:
        update_feed.retry(args=[feed], exc=exc)

Maybe there is a more sophisticated/robust way of achieving the same result using some celery mechanism that I missed ?

Luper Rouch
  • 9,304
  • 7
  • 42
  • 56

6 Answers6

47

Based on MattH's answer, you could use a decorator like this:

from django.core.cache import cache
import functools

def single_instance_task(timeout):
    def task_exc(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            lock_id = "celery-single-instance-" + func.__name__
            acquire_lock = lambda: cache.add(lock_id, "true", timeout)
            release_lock = lambda: cache.delete(lock_id)
            if acquire_lock():
                try:
                    func(*args, **kwargs)
                finally:
                    release_lock()
        return wrapper
    return task_exc

then, use it like so...

@periodic_task(run_every=timedelta(minutes=1))
@single_instance_task(60*10)
def fetch_articles()
    yada yada...
SteveJ
  • 706
  • 1
  • 8
  • 9
  • 10
    Thanks; worked for me! Notice however that this does in fact not work with default django CACHES because the default is set to local memory caching which means each process has its own cache, so each celery worker (process) will have its own cache.... – Paul Bormans Jun 06 '15 at 21:19
36

From the official documentation: Ensuring a task is only executed one at a time.

MattH
  • 37,273
  • 11
  • 82
  • 84
  • 1
    I don't see anything superior in this approach, it's way more complex but basically does the same thing (and using the django cache to store locks seems awkward) – Luper Rouch Nov 04 '10 at 13:02
  • 5
    Oh I missed a big detail, it makes the lock process and thread safe. – Luper Rouch Dec 30 '10 at 18:11
  • Do you know if this is still valid when writing to a global variable? http://stackoverflow.com/questions/7719203/celery-periodic-task-running-multiple-times-in-parallel – user Oct 11 '11 at 03:35
  • 5
    @LuperRouch another issue related to your locking mechanism: it only works when there is only one worker running :) – Tommaso Barbugli Oct 03 '12 at 12:35
  • 2
    here is an approach using redis to store the lock: http://loose-bits.com/2010/10/distributed-task-locking-in-celery.html – Florian Jun 10 '13 at 15:56
  • 6
    this link from the *official documentation* is pretty useless when not running celery in a django environment, as it relies on setting a cache key and releasing it once the task has finished. has anyone tried an approach with multiprocessing.Semaphore to prevent tasks from a single worker being executed concurrently? – gru Mar 18 '14 at 11:05
  • This is a terrible suggestion. Setting up yet-another-server just to regulate tasks going into your Celery server is a lot of extra complexity relative to the utility you're getting out of it. – Cerin May 15 '17 at 18:26
  • 1
    @Cerin - service, not server. When the requirement is to ensure that a task is only executed by one worker in a distributed multi-worker system then the functionality required is that of a lock service. There are many ways of implementing a lock service. – MattH May 15 '17 at 19:00
  • Any reason why one wouldn't just create a unique worker for this task with concurrency=1? `... celery worker --queues= fetch_articles_queue --concurrency=1` And then use: `CELERYBEAT_SCHEDULE = {'fetch_articles': {'task': 'path.to.tasks.fetch_articles', 'schedule': 60 # every 60 secs, 'options': {'queue' : 'fetch_articles_queue'} # Use this queue},}` (Doesn't let me do multiline code blocks in comments ...) – luke Jun 20 '17 at 15:26
  • @luke: firstly, you lose the resilience of having multiple distributed workers capable of performing the task. Secondly, it makes it easier to accidentally run the task concurrently. Newer features of task TTL probably mitigate the possible problems of the job overrunning and beat-spawned requests backing up. – MattH Jun 20 '17 at 15:37
  • This example "is using the cache framework to set a lock that’s accessible for all workers." but it's not clear a) Is it visible also to clients (i.e. can it be checked before the task is published?) b) What scope is this visibility? If there are many workers across many machines, can they all see this cache? – Bernd Wechner Apr 15 '20 at 23:11
  • I was wondering why using [celery.app.control.inspect().active()](https://docs.celeryproject.org/en/3.1/reference/celery.app.control.html#celery.app.control.Inspect.active) to get the list of the currently running tasks has been directly discarded. Wouldn't be enough comparing the current task against that list to assure uniqueness? – Ander Feb 24 '21 at 10:45
20

Using https://pypi.python.org/pypi/celery_once seems to do the job really nice, including reporting errors and testing against some parameters for uniqueness.

You can do things like:

from celery_once import QueueOnce
from myapp.celery import app
from time import sleep

@app.task(base=QueueOnce, once=dict(keys=('customer_id',)))
def start_billing(customer_id, year, month):
    sleep(30)
    return "Done!"

which just needs the following settings in your project:

ONCE_REDIS_URL = 'redis://localhost:6379/0'
ONCE_DEFAULT_TIMEOUT = 60 * 60  # remove lock after 1 hour in case it was stale
vdboor
  • 21,914
  • 12
  • 83
  • 96
10

If you're looking for an example that doesn't use Django, then try this example (caveat: uses Redis instead, which I was already using).

The decorator code is as follows (full credit to the author of the article, go read it)

import redis

REDIS_CLIENT = redis.Redis()

def only_one(function=None, key="", timeout=None):
    """Enforce only one celery task at a time."""

    def _dec(run_func):
        """Decorator."""

        def _caller(*args, **kwargs):
            """Caller."""
            ret_value = None
            have_lock = False
            lock = REDIS_CLIENT.lock(key, timeout=timeout)
            try:
                have_lock = lock.acquire(blocking=False)
                if have_lock:
                    ret_value = run_func(*args, **kwargs)
            finally:
                if have_lock:
                    lock.release()

            return ret_value

        return _caller

    return _dec(function) if function is not None else _dec
jbkkd
  • 1,510
  • 5
  • 18
  • 37
keithl8041
  • 2,383
  • 20
  • 27
6

I was wondering why nobody mentioned using celery.app.control.inspect().active() to get the list of the currently running tasks. Is it not real time? Because otherwise it would be very easy to implement, for instance:

def unique_task(callback,  *decorator_args, **decorator_kwargs):
    """
    Decorator to ensure only one instance of the task is running at once.
    """
    @wraps(callback)
    def _wrapper(celery_task, *args, **kwargs):
        active_queues = task.app.control.inspect().active()
        if active_queues:
            for queue in active_queues:
                for running_task in active_queues[queue]:
                    # Discard the currently running task from the list.
                    if task.name == running_task['name'] and task.request.id != running_task['id']:
                        return f'Task "{callback.__name__}()" cancelled! already running...'

        return callback(celery_task, *args, **kwargs)

    return _wrapper

And then just applying the decorator to the corresponding tasks:

@celery.task(bind=True)
@unique_task
def my_task(self):
    # task executed once at a time.
    pass

Ander
  • 5,093
  • 7
  • 41
  • 70
0

This solution for celery working at single host with concurency greater 1. Other kinds (without dependencies like redis) of locks difference file-based don't work with concurrency greater 1.

class Lock(object):
    def __init__(self, filename):
        self.f = open(filename, 'w')

    def __enter__(self):
        try:
            flock(self.f.fileno(), LOCK_EX | LOCK_NB)
            return True
        except IOError:
            pass
        return False

    def __exit__(self, *args):
        self.f.close()


class SinglePeriodicTask(PeriodicTask):
    abstract = True
    run_every = timedelta(seconds=1)

    def __call__(self, *args, **kwargs):
        lock_filename = join('/tmp',
                             md5(self.name).hexdigest())
        with Lock(lock_filename) as is_locked:
            if is_locked:
                super(SinglePeriodicTask, self).__call__(*args, **kwargs)
            else:
                print 'already working'


class SearchTask(SinglePeriodicTask):
    restart_delay = timedelta(seconds=60)

    def run(self, *args, **kwargs):
        print self.name, 'start', datetime.now()
        sleep(5)
        print self.name, 'end', datetime.now()
user12397901
  • 460
  • 1
  • 4
  • 14