2

I have a celery task, that mangles some variable. It works perfect if I set a single celery worker, but when I use concurrency, it all messed up. How could I lock the critical section, where variable is mangled?

inb4: using Python 3.6, Redis both as broker and result backed. threading.Lock doesn't help in here.

f1st
  • 23
  • 1
  • 4

2 Answers2

1

As long as celery runs on multiple workers (processes) thread lock would not help, because it works inside single process. Moreover threading lock have use when you control overall process, while using celery there is no way to achieve that.

It means that celery requires distributed lock. For django I always use django-cache, as in: here. If you need more generic locks especially Redis based, working for any python app you can use sherlock.

scezar
  • 46
  • 1
  • Stumbled upon first url you pointed out while was googling, but for some reason was thinking there are easier ways. Anyway, sherlock solved the problem, thanks a lot. – f1st Aug 24 '17 at 09:55
1

I know this is a question with 2+ years, but right now I'm tuning my celery configs and I came to this topic.

I am using python 2.7 with Django 1.11 and celery 4 in a linux machine. I am using rabbitmq as a broker.

My configs implies to have celery running as daemon and celery beat to handle scheduled tasks.

And so, with a dedicated queue for a given task, you may configure this queue with a worker (process) with concurrency=1 (subprocesses).

This solution solves concurrency problems for celery to run a task, but in your code, if you run the task without celery, it won't respect concurrency principles.

Example code:

CELERY_TASK_QUEUES = (
   Queue('celery_periodic', default_exchange, routing_key='celery_periodic'),
   Queue('celery_task_1', celery_task_1_exchange, routing_key='celery_task_1'),
)

default_exchange            = Exchange('celery_periodic', type='direct')
celery_task_1_exchange      = Exchange('celery_task_1', type='direct')

CELERY_BEAT_SCHEDULE = {
   'celery-task-1': {
       'task':     'tasks.celery_task_1',
       'schedule': timedelta(minutes=15),
       'queue':    'celery_task_1'
   },
}

and finally, in /etc/default/celeryd (docs here: https://docs.celeryproject.org/en/latest/userguide/daemonizing.html#example-configuration):

CELERYD_NODES="worker1 worker2"
CELERYD_OPTS="--concurrency=1 --time-limit=600 -Q:worker1 celery_periodic -Q:worker2 celery_task_1"

--concurrency N means you will have exactly N worker subprocesses for your worker instance (meaning the worker instance can handle N conccurent tasks) (from here: https://stackoverflow.com/a/44903753/9412892).

Read more here: https://docs.celeryproject.org/en/stable/userguide/workers.html#concurrency

BR, Eduardo

Dharman
  • 30,962
  • 25
  • 85
  • 135