4

I have the following problem: I have a RabbitMQ cluster, a messages producer and a cluster of consumers (for high availability). Whenever a consumer is receiving the message it is spawning another process based on the message content and running it. This process is pretty long, takes like 30 min.

I must ensure that messages are processed one at a time. However, there are more consumers then one, so if there are 2 messages in a queue, one consumer gets one message, a second consumer another message, and they are processed in parallel.

For reference: each consumer resides in a different machine.

Is there any mechanism on the level of RabbitMQ that would allow me to wait with consuming next message until the previous one was ACKed? Or do I have to develop some locking mechanism between the servers?

Lorenzo Belli
  • 1,767
  • 4
  • 25
  • 46
Mat
  • 2,378
  • 3
  • 26
  • 35

2 Answers2

2

I am just referring a couple of points from rabbitmq docs that I thinks would able be able to satisfy your requirements.

  1. First ref - https://www.rabbitmq.com/consumer-priority.html

Normally, active consumers connected to a queue receive messages from it in a round-robin fashion. When consumer priorities are in use, messages are delivered round-robin if multiple active consumers exist with the same high priority.

I assume all of your consumers have same priority, therefore the messages will be evenly distributed to all active consumers.

  1. Second ref - https://www.rabbitmq.com/consumer-prefetch.html

the basic.qos method to allow you to limit the number of unacknowledged messages on a channel (or connection) when consuming

As you have mentioned you will have single consumer on one machine, its even easier.

Just set consumer prefetch limit 1 per consumer. So the server will deliver only one message to consumers before requiring acknowledgements. And send the basic ack once your message is fully processed.

Channel channel = ...;
Consumer consumer = ...;
channel.basicQos(1); // Per consumer limit

Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("received message");
                // process the message .. time consuming

                // after processing send the basic ack so that next message can be received from queue
                channel.basicAck(envelope.getDeliveryTag(), false);
        };

 channel.basicConsume("my-queue", false, consumer);

I hope this helps.

Update -

Just to add a bit more description - When you use

channel.basicQos(x);

Rabbitmq will push (if available in the queue, of course after complying with priority and round robin etc.) at the max x number of unacked messages to each consumer on the channel. That means each consumer on the channel will not have more than x unacked messages at it, that is consumer can process at the max x messages simultaneously at any given moment. As soon as the consumer sends an ack back, the next message can be pushed to it. A consumer can also send a nack if it feels it is not able to process the message. In such case the message will be re-queued, and the re-queued message may go to any of the consumers on the queue depending on priority, round robin etc.

Each channel can have multiple consumers. So, when you use

channel.basicQos(x, true);

The limit x applies to the whole channel rather than a single/each consumer on the channel.

In your case you have only one consumer on each channel. So channel limit actually does not have any impact on your case.

More Update -

A machine connects to RabbitMQ by a connection. A connection can have multiple channels. And a channel can have multiple Consumers. So logically there can be different machines connected to RabbitMQ and having multiple channels and consumers listening on the same Queue. You can set QOS limit to both channel (using channel.basicQos(x, true)) as well as consumers within the channel (using channel.basicQos(x, false)) simultaneously. Limit 0 means unlimited. Apparently these limits are applied to channel instance. All the consumers residing on different channel instances (on same machine or different machine) will have their own limit (either default or if set explicitly by QOS method).

vsoni
  • 2,828
  • 9
  • 13
  • Thanks! Not sure if I understand it correctly. If I set QOS to 1, having two identical consumers, it will be impossible that two messages will be consumed at the same time? – Mat Dec 10 '17 at 13:23
  • Hi, sorry for late reply. Thanks for all the explanation. I see now how the QOS works, but I'm trying to figure out how to connect many consumers to one channel (using Node.js). If I can acomplish that I will have the problem solved:) I will update about the progress – Mat Dec 30 '17 at 17:56
  • So from what I can see here: https://stackoverflow.com/questions/18418936/rabbitmq-and-relationship-between-channel-and-connection#18419417 it's impossible.. Channel is a "virtual connection" inside a TCP connection. I don't think I can make two different clients on two different machines using the same TCP connection;) So I think this is impossible and I will have to make some external locking:( – Mat Dec 30 '17 at 19:24
0

As @vsoni explained in general we can use the basic QOS which will allow to consume X messages at a time per consumer and to be more precise per channel. Since the Channels are "lightweight connections that share a single TCP connection" (see here) then, according to my knowledge, it is impossible to make a QOS between two consumers running on two different machines. My goal was to not send send a message to second consumer while the first one is still consuming the previous one. I only managed to achieve it by external locking (for development purposes I made a simple file lock and 3 consumers on one machine) but for production I will probably use a DynamoDB locking. Other option is to use distributed locking with Zookeeper, etcd or similar software.

Mat
  • 2,378
  • 3
  • 26
  • 35
  • 1
    Ref this link - https://www.rabbitmq.com/blog/2014/02/19/distributed-semaphores-with-rabbitmq/ ... might be helpful. But just trying to understand .... if you don't want other consumers (even residing on different machines) to process a message while one message is still being processed by a consumer, then why do you want multiple consumers. One consumers should be sufficient. – vsoni Dec 31 '17 at 17:46
  • It's for high availability, I mentioned in the question. We have machines in two availability zones (AWS) and in case one "dies" we still want to be able to process the messages, but (unfortunatelly because of the API limitations) we can't connect two clients at a time). But thanks for all the information, it was really helpful! – Mat Dec 31 '17 at 17:53