0

I have a task in celery that can only be ran once because it needs to import a new copy of my module and sub modules from a different folder each time it runs. Celery does not handle this well, it holds on to the modules from the previous run. It seems like the only solution is to shut off the worker after the task is complete.

However, when trying to turn it off, the message goes back on the queue.

I have tried

  • app.control.shutdown(destination=[hostname]) this shuts down the worker after the task finishes successfully but still puts the task back on the queue app.control.broadcast('shutdown',destination=[hostname]) this has the same affect as the above command
  • revoke() revoke allows the task to be ignored but it still sits on the queue.
  • raise IGNORE() and REJECT() exceptions to stop the task if redilivered but this does not solve the problem of redelivery in the first place.

Note:

I am using rabbitmq as the message broker. This is a long running task (hours) and we only want one task to be run at a time on each worker. No prefetching, So concurrency is set to 1 and ack_late = True.

Ilani1112
  • 1
  • 1

2 Answers2

0

You can use something like Redis or even a database to keep track of if your task has been done or not. Just store a unique ID for each task and a flag to say if it's done.

In the task code, you can check the storage to see if it's done. If it is, just exit without doing anything. If not, do the task and then update the storage to say it's done.

You'll want to manually acknowledge the task after it's done. This stops it from going back to the queue. You can use something like message.ack() in your task code, where message is the task message object.

So, with this setup, even if the worker shuts down and the task comes back to the queue, the storage will show it's done and the task won't run again. Hope this helps!

donkihott
  • 3
  • 4
0

You can use worker_max_tasks_per_child. As noted in celery doc, you can specify how many task each worker will do. If you set in config file;

worker_max_tasks_per_child = 1

Each worker shutoff when complete 1 task and stands up again for a new task.

And You can specify the number of message for each worker to reserve in queue. For it, you can use worker_prefetch_multiplier. If you set in config file:

worker_prefetch_multiplier = 1

Each worker reserve one message.

Also if you need define any process before stand up worker, you can use @worker_process_init.connect and @worker_process_shutdown.connect.

For example.

Source: Celery Doc.

Yavuz BALI
  • 21
  • 3