3

This question is a follow up of django + celery: disable prefetch for one worker, Is there a bug?

I had a problem with celery (see the question that I follow up) and in order to resolve it I'd like to have two celery workers with -concurrency 1 each but with two different settings of task_acks_late.

My current approach is working, but in my opinion not very beautiful. I am doing the following:

in settings.py of my django project:

CELERY_TASK_ACKS_LATE = os.environ.get("LACK", "False") == "True"

This allows me to start the celery workers with following commands:

LACK=True celery -A miniclry worker --concurrency=1 -n w2 -Q=fast,slow --prefetch-multiplier 1 
celery -A miniclry worker --concurrency=1 -n w1 -Q=fast

What would be more intuitive would be if I could do something like:

celery -A miniclry worker --concurrency=1 -n w2 -Q=fast,slow --prefetch-multiplier 1 --late-ack=True
celery -A miniclry worker --concurrency=1 -n w1 -Q=fast --late-ack=False

I found Initializing Different Celery Workers with Different Values but don't understand how to embed this in my django / celery context. In which files would I have to add the code that's adding an argument to the parser and how could I use the custom param to modify task_acks_late of the celery settings.

Update: Thanks to @Greenev's answer I managed to add custom options to celery. However it seems, that changing the config with this mechanism 'arrives too late' and the chagne is not taken into account.

gelonida
  • 5,327
  • 2
  • 23
  • 41
  • some are celery cron jobs, some are tasks triggered by an action on the web server (trigger by a django post requet) The celery processes will be either started by uwsgi or by supervisor. I didn't decide so far. – gelonida Oct 10 '19 at 18:18
  • @CalebGoodman. No reaction from you. Was my answer not what you expected? Did I miss the intention of your question? – gelonida Oct 14 '19 at 15:51
  • You can annotate each task with acks_late, and turn it on/off... – DejanLekic Oct 15 '19 at 16:25
  • @DejanLekic In fact I want fast tasks to be acknowledged early in one worker and late in the other, so annotating tasks is not the solution as it doesn't know which worker will pick it up. – gelonida Oct 19 '19 at 15:26
  • Nothing prevents you from having two tasks (the second just calls the first...) with different acks_late settings... – DejanLekic Oct 21 '19 at 11:56
  • @DejanLekic Pls read again the question and look at the question which trigger this question. The issue is, that one task should have different acks_late setting depending on the worker which picked it up. So the choice of ack_late or not ack_late depends on the worker it is running on so a configuration by task is not appropriate. – gelonida Oct 21 '19 at 13:07
  • I think the only way to accomplish that is via task_queues and task_routes. [Reference](https://docs.celeryproject.org/en/latest/userguide/routing.html#routing-tasks) – DejanLekic Oct 21 '19 at 17:00
  • @DejanLekic: I do have a solution that is working (It solves https://stackoverflow.com/questions/58290045/django-celery-disable-prefetch-for-one-worker-is-there-a-bug ) However the solution (using env var being read in django settings and run one celery worker with one value and the other celery worker with an other value of that env var) is not the most elegant one. I am using routes and one worker treating with early_ack for fast queue and the other worker treating fast and slow queue with late_ack. Greenev's answer would be perfect but unfortunately doesn't work so far. – gelonida Oct 21 '19 at 18:43

1 Answers1

3

One possible solution here is to provide acks_late=True as an argument of the shared_task decorator, given your code from the prior question:

@shared_task(acks_late=True)
def task_fast(delay=0.1):
    logger.warning("fast in")
    time.sleep(delay)
    logger.warning("fast out")

UPD. I haven't got task_acks_late to be set using this approach, but you could add a command line argument as follows.

You've already linked to a solution. I can't see any django specifics here, just put the parser.add_argument code to where you have defined your app, given your code from the prior question, you would have something like this:

app = Celery("miniclry", backend="rpc", broker="pyamqp://")
app.config_from_object('django.conf:settings', namespace='CELERY')

def add_worker_arguments(parser):
    parser.add_argument('--late-ack', default=False)

app.user_options['worker'].add(add_worker_arguments)

Then you could access your argument value in celeryd_init signal handler

@celeryd_init.connect
def configure_worker(sender=None, conf=None, options=None, **kwargs):
    conf.task_acks_late = options.get('late-ack') # get custom argument value from options
Greenev
  • 871
  • 6
  • 23
  • Thanks @Greenev. IN fact this would be a solution, but not in the contect of my initial question ( https://stackoverflow.com/questions/58290045/django-celery-disable-prefetch-for-one-worker-is-there-a-bug ) where I would like, that the same task is either executed with acks_late=True or with acks_late=False depending on the worker it was handled with. One worker should handle with acks_late in order to have the best trhoughput, the other worker should have less throughput, but at least help a little to use the other CPU. – gelonida Oct 14 '19 at 15:49
  • The nicer solution would probably be to add a command line argument to celery. – gelonida Oct 14 '19 at 15:58
  • Thanks for the updated answer. Will try out, but the indentation in your answer seems to be wrong, I think there's also one useless trailing ',' – gelonida Oct 18 '19 at 13:11
  • following line is also missing. It's perhaps obvious for some people, but probably better to add it: `from celery.signals import celeryd_init` can this be in the file project/celery.py ? I have the impression it can, but want to be sure about potential boundary cases (issues with import order) – gelonida Oct 18 '19 at 16:17
  • Finally I tried the suggestion of using the celeryd_init signal to modify the conf. Adding an option works fine, the the function connected to ceryd_Init is called. However it seems, that the conf is changed too late, as the changes are not taken into account. In fact the Django settings were already imported before. (and celery was probably already configured before (at least the late_ack) Any other idea? So at the moment I seem to be stuck with my ugly env var trick – gelonida Oct 18 '19 at 23:30
  • you are right, we have only `task_acks_late` being set here, but not `task.late_ack`, so it seems like we need to dive into the celery source code to solve this issue – Greenev Oct 21 '19 at 12:54
  • Thanks for the answer. For the time being I'll use an env var to solve the issue, but I'd really like to implement a `--late-ack` option. So I'd be curious whether, when there would be a fix. So pls keep me updated on this issue. – gelonida Oct 21 '19 at 13:19
  • Now question to the SO community. What to do with this question? It seems, that at the moment, there is no other solution than what I suggested already in my question. There is one upvote to Greenev\s answer, but at the moment it is not working. It very probably will with a newer version of celery or some minor modification / monkey patch. If I get no answer from you, then I'll just wait until there is a working answer, and then upvote / check the answer. – gelonida Oct 21 '19 at 13:25
  • Any progress about this issue? Is there perhaps an official issue / enhancement for this? If yes, we could add a link to this question till the issue is fixed – gelonida Nov 07 '19 at 17:00