1

I am trying to create a celery app using reverse topic exchange rabbitmq plugin from Alvaro Videla. The workers seems to connect with the broker fine using this exchange but when I topic-reverse-route my task, do not pick up the '#' or '*', works like direct exchange.

so thats my queue:

Queue(name='cluster', 
          exchange = Exchange(name='cluster', 
                              type='x-rtopic',
                              delivery_mode='persistent',
                              durable=True), 
          routing_key='intel.%d.%s' % (n_cores, hostname),
          durable = True,)

Now picture 2 workers using the following routing_key

  • Worker1 : intel.8.host1
  • Worker2 : amd.2.host2

Thats the routing_keys on the tasks I am trying and what I experienced:

Routing key     | Works?   |  Result              | Expected
-------------------------------------------------------------------------
'intel'         | OK       | Nobody receives      | 
'intel.*'       | OK       | Nobody receives      |
'intel.#'       | WRONG    | Everyone receives    | just Worker1 receives
'#.host1'       | WRONG    | Everyone receives    | just Worker1 receives
'intel.*.*      | WRONG    | Everyone receives    | just Worker1 receives
'intel.*.host1  | WRONG    | Everyone receives    | just Worker1 receives
'*.2.*'         | WRONG    | Everyone receives    | just Worker2 receives
'intel.8.host1' | OK       | like direct exchange | 

To try identify where was the problem, I've tested the plugin doing simple messaging using pika and just kombu as well and both worked fine, exactly as expected. So I figured must be a problem with the way Celery is exchanging (routing) the messages. Maybe I should create a custom routing class!?

Thanks in advance.

AndreLobato
  • 170
  • 1
  • 12
  • Perhaps you could take a look on the management plugin what are the bindings created by Celery – old_sound Aug 11 '14 at 13:33
  • When I looked closer at Celery's amqp implementation found out that it was creating automatically an direct Exchange instead the one I've setted when publishing the messages. Then I forced celery to use that exchange type and bumped on the py-amqp that does not support using others exchanges types. Maybe is just when using channels that occurs. Cheers – AndreLobato Aug 13 '14 at 06:55
  • I solve this by creating a MyRouter and creating queues with the same name of the routing_key (or has the routing key on it). The router redirects the tasks to the correct queue, exchange and routing key, then worked like a charm, plus I setted tasks to a default exchange when necessary. – AndreLobato Oct 03 '14 at 11:18

1 Answers1

0

After dig a while I found out that the Reverse topic exchange plugin works fine with Celery. I was misinterpreting the way how the Rabbitmq Queues works. To fully make it work I had to define a My Router where the task is routed to the exchange containing those queues, and only specifying the routing_key and the exchange name, that way the tasks would still round-robin the nodes connected to that exchange and be able to use wildcards on the tasks routing key.

So the queue settings would be something like this:

routed_queue = 'intel.8.pchost'

CELERY_QUEUES = (

    Queue(name='cluster.%s' % routed_queue,
          exchange = Exchange(name='cluster',
                              type='x-rtopic'), 
          routing_key=routed_queue),)

The router would be something like this:

class MyRouter(object):

def route_for_task(self, task, args=[], kwargs={}):

    routing_key = kwargs['routing_key'] if kwargs.has_key('routing_key') and\
                  kwargs['routing_key'] else '#'

    return {'exchange': 'cluster',
            'exchange_type': 'rtopic',
            'routing_key': routing_key}

Then I would pass the routing_key as a kwargs for the task, been able to set in the task "intel.#", meaning this task would be executed by any worker with queue starting with intel.

The only gotcha! here is that for some reason I had to execute the tasks using .apply_async rather then .delay.

The whole idea is to be able to route my tasks accordingly with the machine specs available in the cluster. Some tasks should only run on intel processors and others only amd, or define by number of cores in the node or using the hostnames.

Hope this can help anyone trying to do the same in the future.

AndreLobato
  • 170
  • 1
  • 12