9

I'm using Django and Celery and I'm trying to setup routing to multiple queues. When I specify a task's routing_key and exchange (either in the task decorator or using apply_async()), the task isn't added to the broker (which is Kombu connecting to my MySQL database).

If I specify the queue name in the task decorator (which will mean the routing key is ignored), the task works fine. It appears to be a problem with the routing/exchange setup.

Any idea what the problem could be?

Here's the setup:

settings.py

INSTALLED_APPS = (
    ...
    'kombu.transport.django',
    'djcelery',
)
BROKER_BACKEND = 'django'
CELERY_DEFAULT_QUEUE = 'default'
CELERY_DEFAULT_EXCHANGE = "tasks"
CELERY_DEFAULT_EXCHANGE_TYPE = "topic"
CELERY_DEFAULT_ROUTING_KEY = "task.default"
CELERY_QUEUES = {
    'default': {
        'binding_key':'task.#',
    },
    'i_tasks': {
        'binding_key':'important_task.#',
    },
}

tasks.py

from celery.task import task

@task(routing_key='important_task.update')
def my_important_task():
    try:
        ...
    except Exception as exc:
        my_important_task.retry(exc=exc)

Initiate task:

from tasks import my_important_task
my_important_task.delay()
Michael Waterfall
  • 20,497
  • 27
  • 111
  • 168
  • How do you pass routing_key? With async_apply? – mher May 23 '12 at 08:07
  • I'm using the `delay()` method, which is just a shortcut for `apply_async()`. I'm trying to keep the `routing_key` specification with the task method (via the decorator) instead of when it's called. I have tried passing the key using `apply_async()` instead but I'm getting the same problem. – Michael Waterfall May 23 '12 at 10:10
  • delay doesn't accept routing_key keyword. It is a simplified, version of apply_async but they are not the same. – mher May 23 '12 at 11:25
  • I'm not passing any routing information when starting the task, I'm specifying it in the task decorator, with the method definition. Please see the code above to see my setup. – Michael Waterfall May 23 '12 at 11:29
  • Am I right in thinking that I can just specify the routing/exchange info in the task decorator and it should be respected when called? – Michael Waterfall May 23 '12 at 12:58

1 Answers1

50

You are using the Django ORM as a broker, which means declarations are only stored in memory (see the, inarguably hard to find, transport comparison table at http://readthedocs.org/docs/kombu/en/latest/introduction.html#transport-comparison)

So when you apply this task with routing_key important_task.update it will not be able to route it, because it hasn't declared the queue yet.

It will work if you do this:

@task(queue="i_tasks", routing_key="important_tasks.update")
def important_task():
    print("IMPORTANT")

But it would be much simpler for you to use the automatic routing feature, since there's nothing here that shows you need to use a 'topic' exchange, to use automatic routing simply remove the settings:

  • CELERY_DEFAULT_QUEUE,
  • CELERY_DEFAULT_EXCHANGE,
  • CELERY_DEFAULT_EXCHANGE_TYPE
  • CELERY_DEFAULT_ROUTING_KEY
  • CELERY_QUEUES

And declare your task like this:

@task(queue="important")
def important_task():
    return "IMPORTANT"

and then to start a worker consuming from that queue:

$ python manage.py celeryd -l info -Q important

or to consume from both the default (celery) queue and the important queue:

$ python manage.py celeryd -l info -Q celery,important

Another good practice is to not hardcode the queue names into the task and use CELERY_ROUTES instead:

@task
def important_task():
    return "DEFAULT"

then in your settings:

CELERY_ROUTES = {"myapp.tasks.important_task": {"queue": "important"}}

If you still insist on using topic exchanges then you could add this router to automatically declare all queues the first time a task is sent:

class PredeclareRouter(object):
    setup = False

    def route_for_task(self, *args, **kwargs):
        if self.setup:
            return
        self.setup = True
        from celery import current_app, VERSION as celery_version
        # will not connect anywhere when using the Django transport
        # because declarations happen in memory.
        with current_app.broker_connection() as conn:
            queues = current_app.amqp.queues
            channel = conn.default_channel
            if celery_version >= (2, 6):
                for queue in queues.itervalues():
                    queue(channel).declare()
            else:
                from kombu.common import entry_to_queue
                for name, opts in queues.iteritems():
                    entry_to_queue(name, **opts)(channel).declare()
CELERY_ROUTES = (PredeclareRouter(), )
asksol
  • 19,129
  • 5
  • 61
  • 68
  • 2
    Is this issue with queue declarations and exchanges resolved in Celery 3? I'm using the new `CELERY_QUEUES = (Queue(...), ...)` in the settings, does this mean the queues are being declared properly? – Michael Waterfall Nov 06 '12 at 12:51
  • 8
    Note: In Celery 4.0 onwards, CELERY_ROUTES has been replaced with CELERY_TASK_ROUTES. Might save someone's time. – Vernon Gutierrez Aug 18 '17 at 12:42