28

I am using the following stack:

  • Python 3.6
  • Celery v4.2.1 (Broker: RabbitMQ v3.6.0)
  • Django v2.0.4.

According Celery's documentation, running scheduled tasks on different queues should be as easy as defining the corresponding queues for the tasks on CELERY_ROUTES, nonetheless all tasks seem to be executed on Celery's default queue.

This is the configuration on my_app/settings.py:

CELERY_BROKER_URL = "amqp://guest:guest@localhost:5672//"
CELERY_ROUTES = {
 'app1.tasks.*': {'queue': 'queue1'},
 'app2.tasks.*': {'queue': 'queue2'},
}
CELERY_BEAT_SCHEDULE = {
    'app1_test': {
        'task': 'app1.tasks.app1_test',
        'schedule': 15,
    },
    'app2_test': {
        'task': 'app2.tasks.app2_test',
        'schedule': 15,
    },

}

The tasks are just simple scripts for testing routing:

File app1/tasks.py:

from my_app.celery import app
import time


@app.task()
def app1_test():
    print('I am app1_test task!')
    time.sleep(10)

File app2/tasks.py:

from my_app.celery import app
import time


@app.task()
def app2_test():
    print('I am app2_test task!')
    time.sleep(10)

When I run Celery with all the required queues:

celery -A my_app worker -B -l info -Q celery,queue1,queue2

RabbitMQ will show that only the default queue "celery" is running the tasks:

sudo rabbitmqctl list_queues
# Tasks executed by each queue:
#  - celery 2
#  - queue1 0
#  - queue2 0

Does somebody know how to fix this unexpected behavior?

Regards,

Ander
  • 5,093
  • 7
  • 41
  • 70

3 Answers3

34

I have got it working, there are few things to note here:

According Celery's 4.2.0 documentation, CELERY_ROUTES should be the variable to define queue routing, but it only works for me using CELERY_TASK_ROUTES instead. The task routing seems to be independent from Celery Beat, therefore this will only work for tasks scheduled manually:

app1_test.delay()
app2_test.delay()

or

app1_test.apply_async()
app2_test.apply_async()

To make it work with Celery Beat, we just need to define the queues explicitly in the CELERY_BEAT_SCHEDULE variable. The final setup of the file my_app/settings.py would be as follows:

CELERY_BROKER_URL = "amqp://guest:guest@localhost:5672//"
CELERY_TASK_ROUTES = {
 'app1.tasks.*': {'queue': 'queue1'},
 'app2.tasks.*': {'queue': 'queue2'},
}
CELERY_BEAT_SCHEDULE = {
    'app1_test': {
        'task': 'app1.tasks.app1_test',
        'schedule': 15,
        'options': {'queue': 'queue1'}
    },
    'app2_test': {
        'task': 'app2.tasks.app2_test',
        'schedule': 15,
        'options': {'queue': 'queue2'}
    },

}

And to run Celery listening on those two queues:

celery -A my_app worker -B -l INFO -Q queue1,queue2

Where

  • -A: name of the project or app.
  • -B: Initiates the task scheduler Celery beat.
  • -l: Defines the logging level.
  • -Q: Defines the queues handled by this worker.

I hope this saves some time to other developers.

Ander
  • 5,093
  • 7
  • 41
  • 70
  • 20
    Explanation of `CELERY_ROUTES` vs `CELERY_TASK_ROUTES` confusion: `CELERY_ROUTES` is the old celery setting name which has now been replaced by `task_routes`. However _celery_ settings in a _django_ settings file must be upper-case (e.g. `TASK_ROUTES`). To avoid conflict with other django settings it's recommended to prefix celery settings with `CELERY_` resulting in `CELERY_TASK_ROUTES`. This is loaded by doing something like: `app.config_from_object('django.conf:settings', namespace='CELERY')`. So `CELERY_TASK_ROUTES` is just an upper-cased-and-prefixed alteration of the new setting name. – sparrowt Jan 03 '19 at 13:27
  • for those who wondering about the celery beat, it is not independent of the task routing and it should work as well. – Reza Jul 20 '20 at 19:40
  • No I dont think we need to define that 'options' param in the settings file if you'll just remove the celery before the other queue names from the command so it will work just fine because you have already defined the CELERY_ROUTES in the settings.py file – Chetan Vashisth Nov 23 '20 at 08:11
  • 2
    This also works well in celery v5.0 , thanks – Ham Jan 29 '21 at 05:54
  • Love the answer! Works just fine for me in v4.4.0 in Jan 2022 and @sparrowt comment and explanation to use CELERY_TASK_ROUTES instead of CELERY_ROUTES was :chefskiss: :D – pyofey Jan 16 '22 at 13:59
25

adding queue parameter to the decorator may help you,

@app.task(queue='queue1')
def app1_test():
    print('I am app1_test task!')
    time.sleep(10)
JPG
  • 82,442
  • 19
  • 127
  • 206
  • 4
    Thanks @JPG, that would be a valid alternative, nonetheless I prefer to define the queues on the Django Setting files to get more flexibility. This way I can use different queue names depending on the environment: Testing, Staging, Production – Ander Sep 06 '18 at 14:29
  • 5
    I gave up with the configuration options, "celery_task_default_queue", "task_default_queue", "task_routes". None of these did anything. Only thing that worked was the decorator, thanks JPG. – Researcher Jan 03 '19 at 11:56
0

Okay as i have tried the same command that you have used to run the worker so I found that you just have to remove the "celery after the -Q parameter and that'll be fine too.

So the old command is

celery -A my_app worker -B -l info -Q celery,queue1,queue2

And the new command is

celery -A my_app worker -B -l info -Q queue1,queue2
Chetan Vashisth
  • 438
  • 4
  • 14
  • One should always specify the queue name and name of the worker. Eg. "celery -A my_app worker -Q my_queue,my_other_queue -P threads --task-events -c 40 -l INFO -B --scheduler django_celery_beat.schedulers:DatabaseScheduler -n celery@%h"" – TheZeke Jun 04 '21 at 18:07