0

Need help in designing the rabbit-mq consumer distribution.

For eg, There are 100 queues and 10 threads to consume messages from that 100 queue. Each thread will be consuming messages from 10 queue each. Question 1 : How to dynamically assign the threads to queues ?. If the threads are running in different machines ?

No more than one thread should consume from a queue (to maintain the order of processing the message in the respective queue) Question 2 : When there is a need to increase the consumer threads while the system runs, How it can be done ?.

user1182253
  • 1,079
  • 2
  • 14
  • 26
  • You can assign the thread to the consumer, using ExecutorService (http://www.rabbitmq.com/api-guide.html). What it does mean " If the threads are running in different machines ?" I think you can solve just use an ThreadPoolExecutor shared from all consumer and resize it dynamically. The question is generic, there are lot of ways to implement what you want. just an OT, I have already answer to your one question (http://stackoverflow.com/questions/23333863/how-to-stop-consuming-message-from-selective-queue-rabbitmq) a feedback is appreciated. – Gabriele Santomaggio Apr 28 '14 at 17:22
  • Could you share an example. How do we ensure the order of processing the messages if we delegate to a ExecutorService ?. "If the threads are running in different machines" - for eg. Out of that 10 threads , 5 are from NodeA and5 are from NodeB – user1182253 Apr 29 '14 at 05:21

1 Answers1

0

There are lot of posts about the messages order (FIFO), in you have a normal situation(one producer one consumer without network problem) you don’t have any problem. But as you can read here

In particular note the "unless the redelivered field is set" condition, which means any disconnect by consumers can cause messages pending acknowledgement to be subsequently delivered out of order.

Also, for example if you publish a message and there is some error during the publish you have to re-publish the message in the correct order. It means that if you need absolutely the messages order you have to implement it, for example marking each packet with a sequential number, and you should also implement confirm publish .

I think, but this is my opinion, that when you use a messages system you shouldn’t worry about the messages order, because it should be your application able to manage the data.

Having said that,if we suppose that the 100 queues have to handle the same messages kind, you could use an ThreadPoolExecutor and shared it from all consumer. For example:

public class ActualConsumer extends DefaultConsumer { 
public ActualConsumer(Channel channel) { 
super(channel); 
} 
@Override 
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws java.io.IOException { 
MyMessage message = new MyMessage(body); 
    mythreadPoolExecutorShared.submit(new MyHandleMessage(message))
} 

}

In this way you can balance the messages between the threads. Also for the threadPool you can use different policies, for example a static allocation with fixed thread number or dynamic thread allocation.
Please read this post about the threadpool resize (Can you dynamically resize a java.util.concurrent.ThreadPoolExecutor while it still has tasks waiting) You can apply this pattern to all nodes, in this way you can balance the dispatching messages and assign a correct threads number.

I hope it can be useful,I'd like to be more detailed, but your question is a bit generic.

Community
  • 1
  • 1
Gabriele Santomaggio
  • 21,656
  • 4
  • 52
  • 52