3

I have some very simple periodic code using Celery's threading; it simply prints "Pre" and "Post" and sleep in between. It is adapted from this StackOverflow question and this linked website

from celery.task import task
from celery.task import periodic_task
from django.core.cache import cache
from time import sleep
import main
import cutout_score
from threading import Lock

import socket
from datetime import timedelta
from celery.decorators import task, periodic_task

def single_instance_task(timeout):
  def task_exc(func):
    def wrapper(*args, **kwargs):
        lock_id = "celery-single-instance-" + func.__name__
        acquire_lock = lambda: cache.add(lock_id, "true", timeout)
        release_lock = lambda: cache.delete(lock_id)
        if acquire_lock():
            try:
                func()
            finally:
                release_lock()
    return wrapper
  return task_exc

LOCK_EXPIRE = 60 * 5 # Lock expires in 5 minutes
@periodic_task(run_every = timedelta(seconds=2))
def test():
    lock_id = "lock"

    # cache.add fails if if the key already exists
    acquire_lock = lambda: cache.add(lock_id, "true", LOCK_EXPIRE)
    # memcache delete is very slow, but we have to use it to take
    # advantage of using add() for atomic locking
    release_lock = lambda: cache.delete(lock_id)

    if acquire_lock():
        try:
            print 'pre'
            sleep(20)
            print 'post'
        finally:
            release_lock()
        return
    print 'already in use...'

This code never prints 'already in use...'; the same phenomenon occurs when I use the @single_instance_task decorator.

Do you know what's wrong?

Edit: I've simplified the question so that it doesn't write to memory (using a global or the django cache); I still never see 'already in use...'


Edit: When I add the following code to my Django settings.py file (by changing the code from https://docs.djangoproject.com/en/dev/topics/cache/ everything works as hoped, but only when I use port 11211 (oddly enough, my server is on port 8000)

CACHES = {
    'default': {
        'BACKEND': 'django.core.cache.backends.memcached.MemcachedCache',
        'LOCATION': [
            '127.0.0.1:11211'
        ]
    }
}
Community
  • 1
  • 1
user
  • 7,123
  • 7
  • 48
  • 90

1 Answers1

3

How are you running celeryd? I'm not familiar with a threaded option.

If it's running multi-process then there are no "global" variables that are shared memory between workers.

If you want a counter shared between all workers, then I'd suggest you use cache.incr.

E.g.:

In [1]: from django.core.cache import cache

In [2]: cache.set('counter',0)

In [3]: cache.incr('counter')
Out[3]: 1

In [4]: cache.incr('counter')
Out[4]: 2

Update

What happens if you force your tasks to overlap by sleeping, e.g.:

print "Task on %r started" % (self,)
sleep(20)
print "Task on %r stopped" % (self,)

If you don't get "already in use..." from running this more frequently then 20 seconds then you know that the cache isn't behaving as expected.


Another Update

Have you set up a cache backend in your django settings? E.g. memcached

If not you may be using the Dummy Cache, which doesn't actually do any caching, just implements the interface... which is sounding like a convincing cause of your problem.

MattH
  • 37,273
  • 11
  • 82
  • 84
  • +1 That sounds like it's related to my problem. I tried using the cache, but still see erratic values of `counter`. Also, I see that multiple workers enter the `test` function. I'm running celeryd with django: `python manage.py celeryd -v 2 -B -s celery -E -l INFO` – user Oct 11 '11 at 14:34
  • Even when I simplify so that the `test` function just prints "hello", it runs on different workers and prints much too frequently (even when I have the `@single_instance_task` decorator defined). – user Oct 12 '11 at 22:16
  • I've simplified the code (above) so that it only prints (as you suggested). It still never prints `'already in use...'`; somehow the cache isn't successfully locking. – user Oct 14 '11 at 16:13
  • Which cache backend are you using? I'm using this recipe successfully with memcached. – MattH Oct 14 '11 at 19:07
  • Ah-- I do `from django.core.cache import cache`; that's the one from ask.github.com/celery/cookbook/tasks.html. Also, it's important to allow celery concurrency >1. With concurrency = 1, it will never produce an error, but will never print 'already in use...' – user Oct 14 '11 at 19:43