I am just referring a couple of points from rabbitmq docs that I thinks would able be able to satisfy your requirements.
- First ref - https://www.rabbitmq.com/consumer-priority.html
Normally, active consumers connected to a queue receive messages from
it in a round-robin fashion. When consumer priorities are in use,
messages are delivered round-robin if multiple active consumers exist
with the same high priority.
I assume all of your consumers have same priority, therefore the messages will be evenly distributed to all active consumers.
- Second ref - https://www.rabbitmq.com/consumer-prefetch.html
the basic.qos method to allow you to limit the number of
unacknowledged messages on a channel (or connection) when consuming
As you have mentioned you will have single consumer on one machine, its even easier.
Just set consumer prefetch limit 1 per consumer. So the server will deliver only one message to consumers before requiring acknowledgements. And send the basic ack once your message is fully processed.
Channel channel = ...;
Consumer consumer = ...;
channel.basicQos(1); // Per consumer limit
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("received message");
// process the message .. time consuming
// after processing send the basic ack so that next message can be received from queue
channel.basicAck(envelope.getDeliveryTag(), false);
};
channel.basicConsume("my-queue", false, consumer);
I hope this helps.
Update -
Just to add a bit more description -
When you use
channel.basicQos(x);
Rabbitmq will push (if available in the queue, of course after complying with priority and round robin etc.) at the max x number of unacked messages to each consumer on the channel. That means each consumer on the channel will not have more than x unacked messages at it, that is consumer can process at the max x messages simultaneously at any given moment. As soon as the consumer sends an ack back, the next message can be pushed to it. A consumer can also send a nack if it feels it is not able to process the message. In such case the message will be re-queued, and the re-queued message may go to any of the consumers on the queue depending on priority, round robin etc.
Each channel can have multiple consumers. So, when you use
channel.basicQos(x, true);
The limit x applies to the whole channel rather than a single/each consumer on the channel.
In your case you have only one consumer on each channel. So channel limit actually does not have any impact on your case.
More Update -
A machine connects to RabbitMQ by a connection. A connection can have multiple channels. And a channel can have multiple Consumers. So logically there can be different machines connected to RabbitMQ and having multiple channels and consumers listening on the same Queue. You can set QOS limit to both channel (using channel.basicQos(x, true)
) as well as consumers within the channel (using channel.basicQos(x, false)
) simultaneously. Limit 0 means unlimited. Apparently these limits are applied to channel instance. All the consumers residing on different channel instances (on same machine or different machine) will have their own limit (either default or if set explicitly by QOS method).