I'm new to rabbitMQ, and I'm trying to make a application where there will be 3 roles: two producers and one consumer. The consumer is related with two queues which related to the two producers. Each producer sends the message to queue with different frequency. What I need is that the consumer read alternatively from the two producers.
For example:
Producer 1: Send "Hello" every 2 seconds Producer 2: Send "World" every 5 seconds Consumer: Print whatever it receives
So the consumer is expected to print:
hello world hello world hello world ...
Since producer 1 send the message more frequently than producer 2, after the consumer have read from consumer 1, it needs to wait a little bit for the arrival of the message from producer 2 (that's the problem)
I tried to declare two queues for the producers and link them to the consumer but the consumer only prints somthing like:
hello hello world hello hello world
Thanks for the help!
Update: Here's my code
Producer 1:
import pika
import sys
message = 'hello'
credentials = pika.PlainCredentials('xxxx', 'xxxx)
connection =pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/', credentials))
channel = connection.channel()
channel.queue_declare(queue='hello')
while True:
channel.basic_publish(exchange='', routing_key='hello', body=message)
print('Sent message: {}'.format(message))
connection.sleep(2)
connection.close()
Producer 2:
import pika
import sys
message = 'world'
credentials = pika.PlainCredentials('xxxx', 'xxxx')
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/', credentials))
channel = connection.channel()
channel.queue_declare(queue='world')
while True:
channel.basic_publish(exchange='', routing_key='world', body=message)
print('Sent message: {}'.format(message))
connection.sleep(4)
connection.close()
Consumer 1:
import pika
def callback(ch, method, properties, body):
print('Receive: {}'.format(body))
credentials = pika.PlainCredentials('xxxx', 'xxxx')
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/', credentials))
channel = connection.channel()
channel.basic_qos(prefetch_count=1)
channel.queue_declare(queue='hello')
channel.queue_declare(queue='world')
channel.basic_consume(on_message_callback=callback, queue='hello', auto_ack=True)
channel.basic_consume(on_message_callback=callback, queue='world', auto_ack=True)
print('Waiting for messages. To exit press CTRL+C')
channel.start_consuming()