218

I am just starting to use RabbitMQ and AMQP in general.

  • I have a queue of messages
  • I have multiple consumers, which I would like to do different things with the same message.

Most of the RabbitMQ documentation seems to be focused on round-robin, ie where a single message is consumed by a single consumer, with the load being spread between each consumer. This is indeed the behavior I witness.

An example: the producer has a single queue, and send messages every 2 sec:

var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
var count = 1;

connection.on('ready', function () {
  var sendMessage = function(connection, queue_name, payload) {
    var encoded_payload = JSON.stringify(payload);  
    connection.publish(queue_name, encoded_payload);
  }

  setInterval( function() {    
    var test_message = 'TEST '+count
    sendMessage(connection, "my_queue_name", test_message)  
    count += 1;
  }, 2000) 


})

And here's a consumer:

var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
connection.on('ready', function () {
  connection.queue("my_queue_name", function(queue){
    queue.bind('#'); 
    queue.subscribe(function (message) {
      var encoded_payload = unescape(message.data)
      var payload = JSON.parse(encoded_payload)
      console.log('Recieved a message:')
      console.log(payload)
    })
  })
})

If I start the consumer twice, I can see that each consumer is consuming alternate messages in round-robin behavior. Eg, I'll see messages 1, 3, 5 in one terminal, 2, 4, 6 in the other.

My question is:

  • Can I have each consumer receive the same messages? Ie, both consumers get message 1, 2, 3, 4, 5, 6? What is this called in AMQP/RabbitMQ speak? How is it normally configured?

  • Is this commonly done? Should I just have the exchange route the message into two separate queues, with a single consumer, instead?

mikemaccana
  • 110,530
  • 99
  • 389
  • 494
  • 5
    I am no RabbitMQ expert. However, what you now have is called queue but what you want is topics, see this tutorial: http://www.rabbitmq.com/tutorials/tutorial-five-python.html, more on queues vs. topics: http://msdn.microsoft.com/en-us/library/windowsazure/hh367516.aspx –  May 16 '12 at 15:03
  • 2
    I believe he wants fanout actually though topics will work as well and will give more control later. – robthewolf May 16 '12 at 15:09
  • Thanks @UrbanEsc. Topics seems to solve the problem by having one message hit multiple queues, and therefore be consumed by each queues consumers. Which leans me further towards the multiple queue/single consumer scenario for my particular case. – mikemaccana May 16 '12 at 15:10
  • 4
    For 2018 (and even for 2016 and earlier) the answer is to use something like Kafka, IMO. – WattsInABox Jun 29 '18 at 17:07

13 Answers13

168

Can I have each consumer receive the same messages? Ie, both consumers get message 1, 2, 3, 4, 5, 6? What is this called in AMQP/RabbitMQ speak? How is it normally configured?

No, not if the consumers are on the same queue. From RabbitMQ's AMQP Concepts guide:

it is important to understand that, in AMQP 0-9-1, messages are load balanced between consumers.

This seems to imply that round-robin behavior within a queue is a given, and not configurable. Ie, separate queues are required in order to have the same message ID be handled by multiple consumers.

Is this commonly done? Should I just have the exchange route the message into two separate queues, with a single consumer, instead?

No it's not, single queue/multiple consumers with each consumer handling the same message ID isn't possible. Having the exchange route the message onto into two separate queues is indeed better.

As I don't require too complex routing, a fanout exchange will handle this nicely. I didn't focus too much on Exchanges earlier as node-amqp has the concept of a 'default exchange' allowing you to publish messages to a connection directly, however most AMQP messages are published to a specific exchange.

Here's my fanout exchange, both sending and receiving:

var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
var count = 1;

connection.on('ready', function () {
  connection.exchange("my_exchange", options={type:'fanout'}, function(exchange) {   
 
    var sendMessage = function(exchange, payload) {
      console.log('about to publish')
      var encoded_payload = JSON.stringify(payload);
      exchange.publish('', encoded_payload, {})
    }

    // Recieve messages
    connection.queue("my_queue_name", function(queue){
      console.log('Created queue')
      queue.bind(exchange, ''); 
      queue.subscribe(function (message) {
        console.log('subscribed to queue')
        var encoded_payload = unescape(message.data)
        var payload = JSON.parse(encoded_payload)
        console.log('Recieved a message:')
        console.log(payload)
      })
    })
  
    setInterval( function() {    
      var test_message = 'TEST '+count
      sendMessage(exchange, test_message)  
      count += 1;
    }, 2000) 
 })
})
Aage
  • 5,932
  • 2
  • 32
  • 57
mikemaccana
  • 110,530
  • 99
  • 389
  • 494
  • 4
    fan out was clearly what you wanted. It would not help you here, but I thought I'd mention round-robin behaviour within a queue is configurable. `int prefetchCount = 1; channel.basicQos(prefetchCount);` This will allow each consumers to receive a message as soon as it's finished with the previous one. Instead of receiving alternating messages. Again doesn't solve your problem, but could be useful for people to know. example here [http://www.rabbitmq.com/tutorials/tutorial-two-java.html](http://www.rabbitmq.com/tutorials/tutorial-two-java.html) under Fair Dispatch – Ommit Jan 27 '15 at 22:35
  • 5
    To clarify: 'default exchange' is not node-amqp specific. It's general AMQP concept with following rules: when any message published to default exchange, the routing key (with which that message published) treated as queue name by AMQP broker. So it seems like you can publish to queues directly. But you are not. The broker simply bind each queue to default exchange with routing key equal to queue name. – Ruslan Stelmachenko Jun 01 '16 at 00:16
  • 2
    Is there any alternative to Apache activemq jms topics in rabbitmq where no queues are involved but rather multicasting? – pantonis Jun 26 '16 at 16:29
  • If same user login from multiple devices then message get only one device.How can be solved it or any idea please? – Rafiq Nov 21 '17 at 07:32
  • @Rafiq you should ask a question for this. – mikemaccana Nov 21 '17 at 14:06
  • @mikemaccana Please check it. It is my question. I am not clear how can be solve it. https://stackoverflow.com/questions/47492996/message-broadcast-when-same-user-login-from-multiple-devices-or-open-multiple-ta – Rafiq Nov 27 '17 at 09:39
  • This is late but for future reference, it is possible for each consumer to receive the same message, I was able to implement for the scenario that you described – Clint Jun 29 '21 at 14:52
  • @Clint add an answer with details then. – mikemaccana Jun 29 '21 at 14:53
  • 1
    sure thing, but right now I'm looking for a way to maintain 1 long running connection among multiple clients – Clint Jun 29 '21 at 14:56
  • Usually, if you want to send the exact same message to different consumers, you would use another tools that works with "pub/sub" messaging and topics! It's used a lot when you want to stream events for your apps to react on something. – MadJlzz Sep 16 '21 at 13:57
73

The last couple of answers are almost correct - I have tons of apps that generate messages that need to end up with different consumers so the process is very simple.

If you want multiple consumers to the same message, do the following procedure.

Create multiple queues, one for each app that is to receive the message, in each queue properties, "bind" a routing tag with the amq.direct exchange. Change you publishing app to send to amq.direct and use the routing-tag (not a queue). AMQP will then copy the message into each queue with the same binding. Works like a charm :)

Example: Lets say I have a JSON string I generate, I publish it to the "amq.direct" exchange using the routing tag "new-sales-order", I have a queue for my order_printer app that prints order, I have a queue for my billing system that will send a copy of the order and invoice the client and I have a web archive system where I archive orders for historic/compliance reasons and I have a client web interface where orders are tracked as other info comes in about an order.

So my queues are: order_printer, order_billing, order_archive and order_tracking All have the binding tag "new-sales-order" bound to them, all 4 will get the JSON data.

This is an ideal way to send data without the publishing app knowing or caring about the receiving apps.

vzwick
  • 11,008
  • 5
  • 43
  • 63
z900collector
  • 1,014
  • 8
  • 10
  • 1
    What if the consumers are dynamic? I want different instances of the same app receive the same message. Any alternatives? – Akash Jul 10 '22 at 21:07
45

Just read the rabbitmq tutorial. You publish message to exchange, not to queue; it is then routed to appropriate queues. In your case, you should bind separate queue for each consumer. That way, they can consume messages completely independently.

driushkin
  • 3,531
  • 1
  • 24
  • 25
9

Yes each consumer can receive the same messages. have a look at http://www.rabbitmq.com/tutorials/tutorial-three-python.html http://www.rabbitmq.com/tutorials/tutorial-four-python.html http://www.rabbitmq.com/tutorials/tutorial-five-python.html

for different ways to route messages. I know they are for python and java but its good to understand the principles, decide what you are doing and then find how to do it in JS. Its sounds like you want to do a simple fanout (tutorial 3), which sends the messages to all queues connected to the exchange.

The difference with what you are doing and what you want to do is basically that you are going to set up and exchange or type fanout. Fanout excahnges send all messages to all connected queues. Each queue will have a consumer that will have access to all the messages separately.

Yes this is commonly done, it is one of the features of AMPQ.

robthewolf
  • 7,343
  • 3
  • 29
  • 29
  • great answer, except by 'is this commonly done?' I was referring to 'having each consumer receive the same messages' - which isn't commonly done (consumers on the same queue always round robin). Probably my fault for not being clear enough. – mikemaccana May 16 '12 at 16:43
  • Actually I would venture to say that it depends what you want to use it for. You have two basic choices pub/sub or work queues. Your original set up was a work queue but what you wanted was a fanout pub/sub. They point is that common usage here is totally dependent on what you want to do. – robthewolf May 16 '12 at 20:55
  • Sure but in a work queue, the same message (eg, the same message ID) is not handled by different consumers - it's implicitly round robin. Again this is probably my fault for not being clear enough. – mikemaccana May 17 '12 at 08:45
  • we appear to be talking at cross purposes here. – robthewolf May 17 '12 at 08:53
  • Sorry about the confusion. If there's some way of having a work queue where consumers on the same queue handle the same message ID, please point me to a reference. Otherwise I'll continue to believe what I've read elsewhere. – mikemaccana May 17 '12 at 09:39
  • No. However, you could try having a fanout exchange with multiple queues all receiving the same message. Then each queue has a set of consumers (workers) receiving the messages from the queue in a round robin. You could then get the same message processed as many times as you have queues, and the speed would be determined by how many consumers each queue has. – robthewolf May 17 '12 at 12:12
  • OK. It sounds like we're on the same track. – mikemaccana May 17 '12 at 14:37
7

The send pattern is a one-to-one relationship. If you want to "send" to more than one receiver you should be using the pub/sub pattern. See http://www.rabbitmq.com/tutorials/tutorial-three-python.html for more details.

Peter Ritchie
  • 35,463
  • 9
  • 80
  • 98
3

RabbitMQ / AMQP: single queue, multiple consumers for same message and page refresh.

rabbit.on('ready', function () {    });
    sockjs_chat.on('connection', function (conn) {

        conn.on('data', function (message) {
            try {
                var obj = JSON.parse(message.replace(/\r/g, '').replace(/\n/g, ''));

                if (obj.header == "register") {

                    // Connect to RabbitMQ
                    try {
                        conn.exchange = rabbit.exchange(exchange, { type: 'topic',
                            autoDelete: false,
                            durable: false,
                            exclusive: false,
                            confirm: true
                        });

                        conn.q = rabbit.queue('my-queue-'+obj.agentID, {
                            durable: false,
                            autoDelete: false,
                            exclusive: false
                        }, function () {
                            conn.channel = 'my-queue-'+obj.agentID;
                            conn.q.bind(conn.exchange, conn.channel);

                            conn.q.subscribe(function (message) {
                                console.log("[MSG] ---> " + JSON.stringify(message));
                                conn.write(JSON.stringify(message) + "\n");
                            }).addCallback(function(ok) {
                                ctag[conn.channel] = ok.consumerTag; });
                        });
                    } catch (err) {
                        console.log("Could not create connection to RabbitMQ. \nStack trace -->" + err.stack);
                    }

                } else if (obj.header == "typing") {

                    var reply = {
                        type: 'chatMsg',
                        msg: utils.escp(obj.msga),
                        visitorNick: obj.channel,
                        customField1: '',
                        time: utils.getDateTime(),
                        channel: obj.channel
                    };

                    conn.exchange.publish('my-queue-'+obj.agentID, reply);
                }

            } catch (err) {
                console.log("ERROR ----> " + err.stack);
            }
        });

        // When the visitor closes or reloads a page we need to unbind from RabbitMQ?
        conn.on('close', function () {
            try {

                // Close the socket
                conn.close();

                // Close RabbitMQ           
               conn.q.unsubscribe(ctag[conn.channel]);

            } catch (er) {
                console.log(":::::::: EXCEPTION SOCKJS (ON-CLOSE) ::::::::>>>>>>> " + er.stack);
            }
        });
    });
durai
  • 31
  • 3
2

As I assess your case is:

  • I have a queue of messages (your source for receiving messages, lets name it q111)

  • I have multiple consumers, which I would like to do different things with the same message.

Your problem here is while 3 messages are received by this queue, message 1 is consumed by a consumer A, other consumers B and C consumes message 2 and 3. Where as you are in need of a setup where rabbitmq passes on the same copies of all these three messages(1,2,3) to all three connected consumers (A,B,C) simultaneously.

While many configurations can be made to achieve this, a simple way is to use the following two step concept:

  • Use a dynamic rabbitmq-shovel to pickup messages from the desired queue(q111) and publish to a fanout exchange (exchange exclusively created and dedicated for this purpose).
  • Now re-configure your consumers A,B & C (who were listening to queue(q111)) to listen from this Fanout exchange directly using a exclusive & anonymous queue for each consumer.

Note: While using this concept don't consume directly from the source queue(q111), as messages already consumed wont be shovelled to your Fanout exchange.

If you think this does not satisfies your exact requirement... feel free to post your suggestions :-)

2

I think you should check sending your messages using the fan-out exchanger. That way you willl receiving the same message for differents consumers, under the table RabbitMQ is creating differents queues for each one of this new consumers/subscribers.

This is the link for see the tutorial example in javascript https://www.rabbitmq.com/tutorials/tutorial-one-javascript.html

Alejandro Serret
  • 1,139
  • 15
  • 16
1

To get the behavior you want, simply have each consumer consume from its own queue. You'll have to use a non-direct exchange type (topic, header, fanout) in order to get the message to all of the queues at once.

Skylos
  • 374
  • 1
  • 9
1

If you happen to be using the amqplib library as I am, they have a handy example of an implementation of the Publish/Subscribe RabbitMQ tutorial which you might find handy.

brettjonesdev
  • 2,271
  • 1
  • 18
  • 23
1

There is one interesting option in this scenario I haven`t found in answers here.

You can Nack messages with "requeue" feature in one consumer to process them in another. Generally speaking it is not a right way, but maybe it will be good enough for someone.

https://www.rabbitmq.com/nack.html

And beware of loops (when all concumers nack+requeue message)!

Alexus1024
  • 447
  • 5
  • 7
  • 2
    I'd highly advice against this, as it does not scale by any means. There is no order for consumers, you cannot guarantee consumer B who will not requeue it, receives the message before consumer A who will process and requeue it, the mentioned loops are a problem. As you say "this is generally speaking not the right way", and I cannot think of a scenario where this would be better than the other answers. – Kevin Streicher Feb 23 '20 at 15:55
1

Fan out was clearly what you wanted. fanout

read rabbitMQ tutorial: https://www.rabbitmq.com/tutorials/tutorial-three-javascript.html

here's my example:

Publisher.js:

amqp.connect('amqp://<user>:<pass>@<host>:<port>', async (error0, connection) => {
    if (error0) {
      throw error0;
    }
    console.log('RabbitMQ connected')
    try {
      // Create exchange for queues
      channel = await connection.createChannel()
      await channel.assertExchange(process.env.EXCHANGE_NAME, 'fanout', { durable: false });
      await channel.publish(process.env.EXCHANGE_NAME, '', Buffer.from('msg'))
    } catch(error) {
      console.error(error)
    }
})

Subscriber.js:

amqp.connect('amqp://<user>:<pass>@<host>:<port>', async (error0, connection) => {
    if (error0) {
      throw error0;
    }
    console.log('RabbitMQ connected')
    try {
      // Create/Bind a consumer queue for an exchange broker
      channel = await connection.createChannel()
      await channel.assertExchange(process.env.EXCHANGE_NAME, 'fanout', { durable: false });
      const queue = await channel.assertQueue('', {exclusive: true})
      channel.bindQueue(queue.queue, process.env.EXCHANGE_NAME, '')

      console.log(" [*] Waiting for messages in %s. To exit press CTRL+C");
      channel.consume('', consumeMessage, {noAck: true});
    } catch(error) {
      console.error(error)
    }
});

here is an example i found in the internet. maybe can also help. https://www.codota.com/code/javascript/functions/amqplib/Channel/assertExchange

ofir_aghai
  • 3,017
  • 1
  • 37
  • 43
0

You just need to assign different groups to the consumers.

red
  • 606
  • 10
  • 17