I tried everything I can found including:
stackoverflow
How to dynamically add / remove periodic tasks to Celery (celerybeat)
Can celery celerybeat dynamically add/remove tasks in runtime?
Github issue
How to dynamically add or remove tasks to celerybeat?
What I got from the above is if I only use celery and celery beat I have to restart the celery beat after I add/remove the tasks. But I don't have to restart it if I combine django-celery-beat.
I follow the docs step by step:
from celery import Celery
from celery.schedules import crontab
app = Celery('tasks')
app.config_from_object('celeryconfig')
app.conf.timezone = 'UTC'
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
# Calls test('hello') every 10 seconds.
sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')
# Calls test('world') every 30 seconds
sender.add_periodic_task(30.0, test.s('world'), expires=10)
# Executes every Monday morning at 7:30 a.m.
sender.add_periodic_task(
crontab(hour=7, minute=30, day_of_week=1),
test.s('Happy Mondays!'),
)
@app.task
def test(arg):
print(arg)
My celeryconfig
BROKER_URL = 'amqp://rabbit'
CELERY_RESULT_BACKEND = 'rpc://rabbit'
CELERY_RESULT_PERSISTENT = True
# CELERY_ACKS_LATE = True
CELERY_DEFAULT_DELIVERY_MODE = 2
CELERY_TASK_RESULT_EXPIRES = 3600
CELERYBEAT_SCHEDULER ="django_celery_beat.schedulers:DatabaseScheduler"
My celery beat run command
celery -A tasks beat -l info -S django
This works well, The tasks run as expected.After that, I wrote a script to add tasks at the runtime
import django
django.setup()
from tasks import app, setup_periodic_tasks
from django_celery_beat.models import PeriodicTask, CrontabSchedule
crontab = CrontabSchedule.objects.create(
minute='*/1',
hour='*',
day_of_week='*',
)
period = PeriodicTask.objects.create(
name='testfasd',
kwargs={},
crontab=crontab,
task='tasks.test',
)
setup_periodic_tasks(app)
When I took a look at the database, I got what I expected, new record as well as the last_update field had updated.And the logs in celery beat also proof that
[2016-12-20 17:37:21,796: INFO/MainProcess] Writing entries...
[2016-12-20 17:37:21,840: INFO/MainProcess] Scheduler: Sending due task add every 10 (tasks.test)
[2016-12-20 17:37:31,848: INFO/MainProcess] DatabaseScheduler: Schedule changed.
[2016-12-20 17:37:31,851: INFO/MainProcess] Writing entries...
[2016-12-20 17:37:31,930: INFO/MainProcess] Scheduler: Sending due task add every 10 (tasks.test)
My question is although celery beat know the database had changed, but it still sending the old tasks and do not send the new task to the worker. Any idea?
Update
I using docker for my project, maybe it's related.