0

i like to add celery periodic task based on user api request , from which i can get the time and date . Is there anyway to trigger celery.beat by api call ?

Gears
  • 1

1 Answers1

0

Solution 1

If all you need is to execute the task at the date/time received from the API request, you don't need to run celery beat for this, all you need is configure the estimated time of arrival (ETA)

The ETA (estimated time of arrival) lets you set a specific date and time that is the earliest time at which your task will be executed.

tasks.py

from celery import Celery

app = Celery("my_app")

@app.task
def my_task(param):
    print(f"param {param}")

Logs (Producer)

>>> from dateutil.parser import parse
>>> import tasks
>>> 
>>> api_request = "2021-08-20T15:00+08:00"
>>> tasks.my_task.apply_async(("some param",), eta=parse(api_request))
<AsyncResult: 82a5710c-095f-49a2-9289-d6b86e53d4da>

Logs (Consumer)

$ celery --app=tasks worker --queues=celery --loglevel=INFO
[2021-08-20 14:58:18,234: INFO/MainProcess] Task tasks.my_task[82a5710c-095f-49a2-9289-d6b86e53d4da] received
[2021-08-20 15:00:00,161: WARNING/ForkPoolWorker-4] param some param
[2021-08-20 15:00:00,161: WARNING/ForkPoolWorker-4] 

[2021-08-20 15:00:00,161: INFO/ForkPoolWorker-4] Task tasks.my_task[82a5710c-095f-49a2-9289-d6b86e53d4da] succeeded in 0.0005905449997953838s: None

As you can see, the task executed at exactly 15:00 as requested by the mocked API request.

Solution 2

If you need to execute the task periodically based on the API request e.g. for every minute from the time indicated, then you have to run celery beat (note that this will be a separate worker). But since this will be a dynamic update where you need to add a new task at runtime without restarting Celery, then you can't just add a new schedule because it wouldn't reflect, you can't either update the file celerybeat-schedule (the file that the celery beat scheduler is reading from time to time to execute scheduled tasks) which holds the info about the tasks that are scheduled because it is locked while the celery beat scheduler is running.

To solve this, you have to change the usage of the file celerybeat-schedule into a database so that it is possible to update it even while celery beat scheduler is running. This way, during runtime, if you update the database and add a new scheduled task, the celery beat scheduler would see it and execute accordingly, without the need of restarts.