14

I tried everything I can found including:

stackoverflow

How to dynamically add / remove periodic tasks to Celery (celerybeat)

Can celery celerybeat dynamically add/remove tasks in runtime?

Github issue

How to dynamically add or remove tasks to celerybeat?

What I got from the above is if I only use celery and celery beat I have to restart the celery beat after I add/remove the tasks. But I don't have to restart it if I combine django-celery-beat.

I follow the docs step by step:

from celery import Celery
from celery.schedules import crontab

app = Celery('tasks')
app.config_from_object('celeryconfig')
app.conf.timezone = 'UTC'

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    # Calls test('hello') every 10 seconds.
    sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')

    # Calls test('world') every 30 seconds
    sender.add_periodic_task(30.0, test.s('world'), expires=10)

    # Executes every Monday morning at 7:30 a.m.
    sender.add_periodic_task(
        crontab(hour=7, minute=30, day_of_week=1),
        test.s('Happy Mondays!'),
    )

@app.task
def test(arg):
    print(arg)

My celeryconfig

BROKER_URL = 'amqp://rabbit'
CELERY_RESULT_BACKEND = 'rpc://rabbit'
CELERY_RESULT_PERSISTENT = True
# CELERY_ACKS_LATE = True
CELERY_DEFAULT_DELIVERY_MODE = 2
CELERY_TASK_RESULT_EXPIRES = 3600
CELERYBEAT_SCHEDULER ="django_celery_beat.schedulers:DatabaseScheduler"

My celery beat run command

celery -A tasks beat -l info -S django

This works well, The tasks run as expected.After that, I wrote a script to add tasks at the runtime

import django
django.setup()
from tasks import app, setup_periodic_tasks
from django_celery_beat.models import PeriodicTask, CrontabSchedule


crontab = CrontabSchedule.objects.create(
       minute='*/1',
       hour='*',
       day_of_week='*',
   )

period = PeriodicTask.objects.create(
       name='testfasd',
       kwargs={},
       crontab=crontab,
       task='tasks.test',
   )

setup_periodic_tasks(app)

When I took a look at the database, I got what I expected, new record as well as the last_update field had updated.And the logs in celery beat also proof that

[2016-12-20 17:37:21,796: INFO/MainProcess] Writing entries...
[2016-12-20 17:37:21,840: INFO/MainProcess] Scheduler: Sending due task add every 10 (tasks.test)
[2016-12-20 17:37:31,848: INFO/MainProcess] DatabaseScheduler: Schedule changed.
[2016-12-20 17:37:31,851: INFO/MainProcess] Writing entries...
[2016-12-20 17:37:31,930: INFO/MainProcess] Scheduler: Sending due task add every 10 (tasks.test)

My question is although celery beat know the database had changed, but it still sending the old tasks and do not send the new task to the worker. Any idea?

Update

I using docker for my project, maybe it's related.

Community
  • 1
  • 1
Windsooon
  • 6,864
  • 4
  • 31
  • 50
  • I tried to do the same and faced the same bug. Actually you and I are not alone. Here you can see more comments on related issue https://github.com/celery/django-celery-beat/issues/7 – Alexander Tyapkov Dec 26 '16 at 16:29
  • I just don't think this library works they way you want it to. The issue around this sort of thing is generally "life cycle" meaning that the application starts, consumes a config, then enters the ready state. The ready state is simple and consistent because its immutable. If you go adding and removing things it makes the engineering difficult(locks, semaphores, is this code reentrant? what do these philosophers even eat?, etc.) – nsfyn55 Dec 29 '16 at 14:39
  • Thanks nsfyn, so the only way I can do is restart it every time I change the tasks? – Windsooon Dec 29 '16 at 15:04
  • I can't say definitively, but it looks that way. I don't see anything in the documentation that would indicate your desired path is possible without considerable modifications to celery's application life cycle. – nsfyn55 Jan 01 '17 at 15:07

3 Answers3

1

This github issue

[You can't add or delete tasks in celerybeat] currently, you have to restart beat.

No. In order to refresh tasks or task timing inside celery[beat], you must restart the celery[beat] instance. Tasks are loaded into memory at runtime. In order to change/add a task, you must refresh the instance.

You may consider using self recurring tasks, using custom timings and conditional execution. Example:

from datetime import timedelta
from celery import shared_task

@shared_task
def check_conditions():
    # Do some db-level code
    if condition:
        check_conditions.apply_async(eta=timedelta(hours=6))

I use this in production and it performs well.

If you need to reload tasks, just programatically restart celery[beat]:

@shared_task
def autoreload():
    if condition:
        execute_shell_code_to_restart_celery()

I have not used this and cannot vouch for it's usability, but theoretically should work.

This github issue

I have to reload beat to get this changes updated on worker ... using django-celery-beat ... This problem is still present on 4.0.2 and on master, tested [on December 21st, 2016].

Kris Molinari
  • 503
  • 6
  • 17
0

Warning

If you change the Django TIME_ZONE setting your periodic task schedule will still be based on the old timezone.

To fix that you would have to reset the "last run time" for each periodic task:

 from django_celery_beat.models import PeriodicTask, PeriodicTasks
 PeriodicTask.objects.all().update(last_run_at=None)
 for task in PeriodicTask.objects.all():
     PeriodicTasks.changed(task)
Mahdi Bahari
  • 109
  • 1
  • 10
0

I was facing the exact same problem as you. The problem is in the django-celery-beat package which uses DatabaseScheduler and the way its implemented, if you want to dynamically manage periodic tasks (add, delete, update) without restarting the celery beat everytime you add a task and if you are willing to use Redis as well you can use this library https://pypi.org/project/celery-redbeat/ . If you want more control over your tasks I've written a library based on it as well and works natively with Django https://pypi.org/project/django-redbeat/