48

By default Celery send all tasks to 'celery' queue, but you can change this behavior by adding extra parameter:

@task(queue='celery_periodic')
def recalc_last_hour():
    log.debug('sending new task')
    recalc_hour.delay(datetime(2013, 1, 1, 2)) # for example

Scheduler settings:

CELERYBEAT_SCHEDULE = {
   'installer_recalc_hour': {
        'task': 'stats.installer.tasks.recalc_last_hour',
        'schedule': 15  # every 15 sec for test
    },
}
CELERYBEAT_SCHEDULER = "djcelery.schedulers.DatabaseScheduler"

Run worker:

python manage.py celery worker -c 1 -Q celery_periodic -B -E

This scheme doesn't work as expected: this workers sends periodic tasks to 'celery' queue, not 'celery_periodic'. How can I fix that?

P.S. celery==3.0.16

Artem Mezhenin
  • 5,539
  • 6
  • 32
  • 51

3 Answers3

64

Periodic tasks are sent to queues by celery beat where you can do everything you do with the Celery API. Here is the list of configurations that comes with celery beat:

https://celery.readthedocs.org/en/latest/userguide/periodic-tasks.html#available-fields

In your case:

CELERYBEAT_SCHEDULE = {
   'installer_recalc_hour': {
        'task': 'stats.installer.tasks.recalc_last_hour',
        'schedule': 15,  # every 15 sec for test
        'options': {'queue' : 'celery_periodic'},  # options are mapped to apply_async options
    },
}
abhi shukla
  • 1,119
  • 1
  • 10
  • 19
  • Well, both answer the question in a way... However, I agree that this answer is slightly better than the accepted one because the question author asked for CELERYBEAT_SCHEDULE change... – DejanLekic Jul 14 '16 at 15:25
27

I found solution for this problem:

1) First of all I changed the way for configuring periodic tasks. I used @periodic_task decorator like this:

@periodic_task(run_every=crontab(minute='5'),
               queue='celery_periodic',
               options={'queue': 'celery_periodic'})
def recalc_last_hour():
    dt = datetime.utcnow()
    prev_hour = datetime(dt.year, dt.month, dt.day, dt.hour) \
                - timedelta(hours=1)
    log.debug('Generating task for hour %s', str(prev_hour))
    recalc_hour.delay(prev_hour)

2) I wrote celery_periodic twice in params to @periodic_task:

  • queue='celery_periodic' option is used when you invoke task from code (.delay or .apply_async)

  • options={'queue': 'celery_periodic'} option is used when celery beat invokes it.

I'm sure, the same thing is possible if you'd configure periodic tasks with CELERYBEAT_SCHEDULE variable.

UPD. This solution correct for both DB based and file based storage for CELERYBEAT_SCHEDULER.

Artem Mezhenin
  • 5,539
  • 6
  • 32
  • 51
2

And if you are using djcelery Database scheduler, you can specify the queue on the Execution Options -> queue field

Mounir
  • 11,306
  • 2
  • 27
  • 34