-1

I'm new to rabbitMQ, and I'm trying to make a application where there will be 3 roles: two producers and one consumer. The consumer is related with two queues which related to the two producers. Each producer sends the message to queue with different frequency. What I need is that the consumer read alternatively from the two producers.

For example:

Producer 1: Send "Hello" every 2 seconds Producer 2: Send "World" every 5 seconds Consumer: Print whatever it receives

So the consumer is expected to print:

hello world hello world hello world ...

Since producer 1 send the message more frequently than producer 2, after the consumer have read from consumer 1, it needs to wait a little bit for the arrival of the message from producer 2 (that's the problem)

I tried to declare two queues for the producers and link them to the consumer but the consumer only prints somthing like:

hello hello world hello hello world

Thanks for the help!

Update: Here's my code

Producer 1:

import pika
import sys

message = 'hello'


credentials = pika.PlainCredentials('xxxx', 'xxxx)
connection =pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/', credentials))
channel = connection.channel()
channel.queue_declare(queue='hello')

while True:
    channel.basic_publish(exchange='', routing_key='hello', body=message)
    print('Sent message: {}'.format(message))
    connection.sleep(2)

connection.close()

Producer 2:

import pika
import sys

message = 'world'


credentials = pika.PlainCredentials('xxxx', 'xxxx')
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/', credentials))
channel = connection.channel()


channel.queue_declare(queue='world')

while True:
    channel.basic_publish(exchange='', routing_key='world', body=message)
    print('Sent message: {}'.format(message))
    connection.sleep(4)

connection.close()

Consumer 1:

import pika

def callback(ch, method, properties, body):
    print('Receive: {}'.format(body))


credentials = pika.PlainCredentials('xxxx', 'xxxx')
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/', credentials))
channel = connection.channel()
channel.basic_qos(prefetch_count=1)
channel.queue_declare(queue='hello')
channel.queue_declare(queue='world')

channel.basic_consume(on_message_callback=callback, queue='hello', auto_ack=True)
channel.basic_consume(on_message_callback=callback, queue='world', auto_ack=True)
print('Waiting for messages. To exit press CTRL+C')


channel.start_consuming()
Jing Zhu
  • 13
  • 1
  • 4
  • If Producer1 prints every 2 seconds it will have printed "Hello" twice by the time Producer2 prints "World". – MrPromethee Jun 06 '19 at 15:58
  • 1
    A `consumer` cannot subscribe to more than one `queue` in `RabbitMQ` – bumblebee Jun 07 '19 at 05:08
  • @MrPromethee Yes that's what I'm having now. What I want is that aftre reading "Hello" from Producer 1, the consumer will wait for the "World" from producer 2, even though there's already several "Hello" in the queue of producer 1 – Jing Zhu Jun 07 '19 at 07:13
  • Does this answer your question? [Consume multiple queues in python / pika](https://stackoverflow.com/questions/24510310/consume-multiple-queues-in-python-pika) – Trevor Boyd Smith Sep 01 '22 at 19:02

1 Answers1

1

Since a consumer can only consume from a single queue, you will have to make sure that all messages are routed to this queue.

It is then up to the consumer to handle the messages. It would have to use the polling API to get a single messages. Depending on which consumer published each message, the consumer would have to act differentlty. It could keep a local store of messages coming from producer 1 that arrived before a message coming from producer 2 has been acted upon. The Cosumer would delay acting on messages it keeps in this store until a message coming from producer 2 has been acted upon. Only then would it take the first message from this store and act on it.

Edit:

In the code you've added to your question, you have a single channel (that's good) but two consumers, one for each call to channel.basic_consume. Both consumers use the same callback method callback. It is this method which would have to implement the logic I've described above.

  • Hello. According to most of the info I found it seems like that. But I tried to make the single-consumer-multiple-queue structure and it actually works. However it doesn't work as expected but at least it didn't break down. I add my code in my question. – Jing Zhu Jun 07 '19 at 07:34
  • Excuse me I'm a little confused. Why do I have two consumer in the single channel when I only run one consumer.py? (However I did check the consumer tags for the two queues in RabbiitMQ Management and there are two different consumer tags for the two queue I'm not sure if that means there's two consumers. But I don't understand why there's two consumers if that's the case. – Jing Zhu Jun 07 '19 at 07:55
  • 1
    You call `channel.basic_consume` *twice*. Each will create a consumer. –  Jun 07 '19 at 08:03