i like to add celery periodic task based on user api request , from which i can get the time and date . Is there anyway to trigger celery.beat by api call ?
-
Should the execution just happen once at the provided time from the API request? Or should it be repeated e.g. for every minute? – Niel Godfrey Pablo Ponciano Aug 20 '21 at 05:33
1 Answers
Solution 1
If all you need is to execute the task at the date/time received from the API request, you don't need to run celery beat for this, all you need is configure the estimated time of arrival (ETA)
The ETA (estimated time of arrival) lets you set a specific date and time that is the earliest time at which your task will be executed.
tasks.py
from celery import Celery
app = Celery("my_app")
@app.task
def my_task(param):
print(f"param {param}")
Logs (Producer)
>>> from dateutil.parser import parse
>>> import tasks
>>>
>>> api_request = "2021-08-20T15:00+08:00"
>>> tasks.my_task.apply_async(("some param",), eta=parse(api_request))
<AsyncResult: 82a5710c-095f-49a2-9289-d6b86e53d4da>
Logs (Consumer)
$ celery --app=tasks worker --queues=celery --loglevel=INFO
[2021-08-20 14:58:18,234: INFO/MainProcess] Task tasks.my_task[82a5710c-095f-49a2-9289-d6b86e53d4da] received
[2021-08-20 15:00:00,161: WARNING/ForkPoolWorker-4] param some param
[2021-08-20 15:00:00,161: WARNING/ForkPoolWorker-4]
[2021-08-20 15:00:00,161: INFO/ForkPoolWorker-4] Task tasks.my_task[82a5710c-095f-49a2-9289-d6b86e53d4da] succeeded in 0.0005905449997953838s: None
As you can see, the task executed at exactly 15:00 as requested by the mocked API request.
Solution 2
If you need to execute the task periodically based on the API request e.g. for every minute from the time indicated, then you have to run celery beat (note that this will be a separate worker). But since this will be a dynamic update where you need to add a new task at runtime without restarting Celery, then you can't just add a new schedule because it wouldn't reflect, you can't either update the file celerybeat-schedule
(the file that the celery beat scheduler is reading from time to time to execute scheduled tasks) which holds the info about the tasks that are scheduled because it is locked while the celery beat scheduler is running.
To solve this, you have to change the usage of the file celerybeat-schedule
into a database so that it is possible to update it even while celery beat scheduler is running. This way, during runtime, if you update the database and add a new scheduled task, the celery beat scheduler would see it and execute accordingly, without the need of restarts.
- For the solution, since you are not in a Django application (where you could use django-celery-beat), I used celery-sqlalchemy-scheduler. You can see the detailed steps in my other answer posted here: https://stackoverflow.com/a/68858483/11043825

- 9,822
- 1
- 17
- 30