0

I'm newbie RabbitMQ java client. My problem: I created 10 consumer and add them into the queue. Every consumer use 10 seconds in order to handle my process. I checked rabbit's page, i seen my queue had 4000 message no send to client. I checked log client and result was get one message for one consumer, after 10 seconds I get one message for one consumer and so on .. I want get 10 message for all consumer at the time(10 message - 10 consumer process at the time) Please help me, I didn't find solution for problem. Thank a lot.

        while (!isRetry) {
        try {
            isRetry = true;
            connection = mConnectionFactory.newConnection(addresses.toArray(new Address[addresses.size()]));
            String queueName = "webhook_customer";
            String exchangeName = "webhook_exchange";
            String routingKey = "customer";
            System.out.println("step2");

            Channel channel = connection.createChannel();
            channel.exchangeDeclare(exchangeName, "topic", true);
            channel.queueDeclare(queueName, true, false, false, null);
            channel.queueBind(queueName, exchangeName, routingKey);
            channel.basicQos(1);
            for (int i = 0; i < numberWorker; i++) {
                Consumer consumer = new QueueingConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope,
                                               AMQP.BasicProperties properties, byte[] body) throws IOException {
                        long startProcess = System.nanoTime();
                        JSONObject profile = null;
                        try {

                        } catch (IOException ioe) {
                            handleLogError(profile, ioe.getMessage().toString());
                        } catch (Exception e) {
                            handleLogError(profile, e.getMessage());
                        } finally {
                            channel.basicAck(envelope.getDeliveryTag(), false);
                            long endProcess = System.nanoTime();
                            _logger.info("===========######### TIME PROCESS  + " + (endProcess - startProcess) + " Nano Seconds  ========#### " + (endProcess - startProcess) / 1000000 + " Milli Seconds");
                        }
                    }
                };

                channel.basicConsume(queueName, false, consumer);
            }
            System.out.printf("Start Listening message ...");
        } catch (Exception e) {
            System.out.println("exception " + e.getMessage());
            isRetry = closeConnection(connection);
            e.printStackTrace();
        } finally {
        }
        if (!isRetry) {
            try {
                System.out.println("sleep waiting retry ...");
                Thread.sleep(30000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        //END
    }
TraiTran
  • 63
  • 1
  • 6
  • Can you post a code example on how you declare your consumer(s). Do you use `channel.basicConsume` or other method ? – Simon Oct 29 '17 at 15:35
  • I update my code in post, please check and help me. Thank a lot <3 – TraiTran Oct 29 '17 at 15:37
  • Possible duplicate of [RabbitMQ by Example: Multiple Threads, Channels and Queues](https://stackoverflow.com/questions/18531072/rabbitmq-by-example-multiple-threads-channels-and-queues) – Aleh Maksimovich Oct 29 '17 at 15:43

2 Answers2

1

I did found solution in my case. I use new thread in consumer when message come in and I process in it. And I create multiple channel in order to multiple message at the time. I use threadpool to control thread

TraiTran
  • 63
  • 1
  • 6
0

From your code sample, it seems that you could use the QueueingConsumer instead of DefaultConsumer. This will pull out more messages from RabbitMQ to the consumer(s) and queue them until they are processed.

Then, in your for (int i = 0; i < 10; i++) loop, you are consuming 10 times with the same consumer instance. You should instead create 10 consumers as such:

for (int i = 0; i < 10; i++) {
    Consumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
          channel.basicAck(envelope.getDeliveryTag(),false);
        }
    };

    channel.basicConsume(queueName, false, consumer);
}

Ideally, create another class and properly create new instances instead of anonymous instances in the loop.

Note: Your consumers should execute their process in the background (separate thread) otherwise they will block eachother. Although, the information you provided does not really show how you will actually handle the messages.

Simon
  • 2,353
  • 1
  • 13
  • 28
  • I used QueueingConsumer ( it deprecated) and modify my code but not working – TraiTran Oct 29 '17 at 15:58
  • RabbitMQ "By default, RabbitMQ will send each message to the next consumer not busy, in sequence. On average every consumer will get the same number of messages. This way of distributing messages is called round-robin. Try this out with three or more workers." But not working, i don't clean .. – TraiTran Oct 29 '17 at 16:44
  • This is because your consumer are not threaded. Look at `Consumer thread pool` section at http://www.rabbitmq.com/api-guide.html – Simon Oct 29 '17 at 17:14