44

I am trying to create a consumer that would subscribe to multiple queues, and then process messages as they arrive.

The problem is that when there is some data already present in the first queue, it consumes the first queue and never goes to consume the second queue. However, when the first queue is empty, it does go to the next queue, and then consumes both queues simultaneously.

I had first implemented threading but want to steer clear of it, when pika library does it for me without much complexity. Below is my code:

import pika

mq_connection = pika.BlockingConnection(pika.ConnectionParameters('x.x.x.x'))
mq_channel = mq_connection.channel()
mq_channel.basic_qos(prefetch_count=1)


def callback(ch, method, properties, body):
    print body
    mq_channel.basic_ack(delivery_tag=method.delivery_tag)

mq_channel.basic_consume(callback, queue='queue1', consumer_tag="ctag1.0")
mq_channel.basic_consume(callback, queue='queue2', consumer_tag="ctag2.0")
mq_channel.start_consuming()
user3295878
  • 831
  • 1
  • 6
  • 19
  • 1
    I tried your code with the only change of adding a logger to prevent exceptions, and declaring the queues. The code works as expected. I publishes some messages to each queue and the messages got routed and echo'ed on the CLI – old_sound Jul 02 '14 at 14:12
  • Hey, can you try with pre-populated queues, and then start the consumer. Let me know if this also works as expected. – user3295878 Jul 02 '14 at 14:15
  • I just tried that and it doesn't work. I only see the messages from the first queue. – old_sound Jul 02 '14 at 14:40
  • 3
    That's what I'm talking about. Wierd isn't it? You have any ideas? – user3295878 Jul 02 '14 at 14:42
  • I don't know much about the python client, that's why I asked Gavin bellow to answer – old_sound Jul 03 '14 at 11:52
  • Does it function the same with other clients? Can you give it try in any other client? If this is pika specific, it will have to be raised. Though Gavin gave a good suggestion, it was already implemented. – user3295878 Jul 03 '14 at 11:54
  • I just tried with the `php-amqplib` client and it works as expected. I pre publish messages to both queues and then all of them are consumed. – old_sound Jul 03 '14 at 12:01
  • Good to know. I was about to raise the issue, when I found out Gavin is the author of pika. Now, it seems it's upon Gavin to help me. – user3295878 Jul 03 '14 at 12:06
  • Possible duplicate of [Python and RabbitMQ - Best way to listen to consume events from multiple channels?](http://stackoverflow.com/questions/28550140/python-and-rabbitmq-best-way-to-listen-to-consume-events-from-multiple-channel) – Vineet Menon Nov 03 '15 at 11:24

3 Answers3

27

One possible solution is to use non blocking connection and consume messages.

import pika


def callback(channel, method, properties, body):
    print(body)
    channel.basic_ack(delivery_tag=method.delivery_tag)


def on_open(connection):
    connection.channel(on_open_callback=on_channel_open)


def on_channel_open(channel):
    channel.basic_consume(queue='queue1', on_message_callback=callback)
    channel.basic_consume(queue='queue2', on_message_callback=callback)


parameters = pika.URLParameters('amqp://guest:guest@localhost:5672/%2F')
connection = pika.SelectConnection(parameters=parameters,
                                   on_open_callback=on_open)

try:
    connection.ioloop.start()
except KeyboardInterrupt:
    connection.close()

This will connect to multiple queues and will consume messages accordingly.

Trevor Boyd Smith
  • 18,164
  • 32
  • 127
  • 177
Chillar Anand
  • 27,936
  • 9
  • 119
  • 136
  • 1
    Could you tell me the purpose of %2F at the end? – Rápli András Mar 13 '17 at 14:57
  • 3
    @RápliAndrás When connecting to rabbitmq, you need to specify virtualhost. The default host is `/`, which is escaped to `%2f`. – Chillar Anand Mar 13 '17 at 15:03
  • 1
    It is worth to note this code did not work for me with Pika 1.1.0. It is just a matter to add on_open_callback= in the on_open method: connection.channel(on_open_callback=on_channel_open) and on_message_callback= in on_channel_open method: channel.basic_consume(on_open_callback=callback, queue='queue1') channel.basic_consume(on_open_callback=callback, queue='queue2') – StratocastFlo Feb 03 '21 at 15:08
  • Is there a way to define a priority between the queues? – ThunderPhoenix Apr 04 '21 at 12:31
  • FYI you can also do this with the pika blocking connection with one channel. [pika has had test code for this since 2015](https://github.com/pika/pika/commit/94f86187bf8cbd6cdc18a52f17c564fbb2c78169) . – Trevor Boyd Smith Sep 01 '22 at 18:57
2

The issue is most likely that the first call has issued a Basic.Consume and has already received messages from a pre-populated queue before the second call is issued. You might want to try setting the QoS prefetch count to 1, which will limit RabbitMQ from sending you more than one message at a time.

Gavin M. Roy
  • 4,551
  • 4
  • 33
  • 29
1

Similar to comments in the first answer above, I was able to get similar results with pika 1.1.0 and the following:

import pika

def queue1_callback(ch, method, properties, body):
  print(" [x] Received queue 1: %r" % body)

def queue2_callback(ch, method, properties, body):
  print(" [x] Received queue 2: %r" % body)

def on_open(connection):
  connection.channel(on_open_callback = on_channel_open)


def on_channel_open(channel):
  channel.basic_consume('queue1', queue1_callback, auto_ack = True)
  channel.basic_consume('queue2', queue2_callback, auto_ack = True)

credentials = pika.PlainCredentials('u', 'p')
parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials)
connection = pika.SelectConnection(parameters = parameters, on_open_callback = on_open)

Try:
  connection.ioloop.start()
except KeyboardInterrupt:
  connection.close()
  connection.ioloop.start()
wovano
  • 4,543
  • 5
  • 22
  • 49
5demayo
  • 11
  • 1