0

I have a consumer that consumes a message from a queue (manual ack), which will then rejected by calling Basic.Nack() or Basic.reject() method. After that, the consumer still consumes that rejected message.

How can I prevent the same consumer from consuming the rejected message? Instead, the rejected message should be redelivered to the different consumers?

UPDATE:

The message listener delegate

public class MessageListenerDelegate implements ChannelAwareMessageListener {
private MessageListener messageListener;

@Override
public void onMessage(Message message, Channel channel) throws Exception {

}

// setter and getter for MessageListener
....

}

public interface MessageListener {
void onMessage(MessageResourceHolder holder) throws Exception;

}

During app startup set up parallel consumers and start the consumer

 
public void listen(String queueName, MessageListener messageListener) {
    SimpleMessageListenerContainer listener = new SimpleMessageListenerContainer(connectionFactory);
    listener.setMessageListener(new MessageListenerAdapter(new MessageListenerDelegate(messageListener)));
    listener.addQueueNames(queueName);
    listener.setConcurrentConsumers(5);
    listener.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    listener.start();
}

And now listen on a queue, then call Basic.Nack() to reject a consumed message


listen(queueName, new MessageListener() {

            @Override
            public void onMessage(MessageResourceHolder holder) throws IOException {
                    holder.getChannel().basicNack(holder.getMessageProperties().getDeliveryTag(), false, true);

            }
        }); 

The consumers are running in many threads, so it always consumes messages from the queue including the rejected messages

Gary Russell
  • 166,535
  • 14
  • 146
  • 179
Hai Pham
  • 3
  • 2
  • Show us what you've tried so far; edit your post to include some code. I suggest reading this: [How do I ask a good question?](https://stackoverflow.com/help/how-to-ask) – Clonkex Jun 23 '17 at 05:46
  • thanks. Just updated the question including code – Hai Pham Jun 23 '17 at 07:05

1 Answers1

0

basicNack(holder.getMessageProperties().getDeliveryTag(), false, true);

By setting the last argument to true, you are explicitly telling RabbitMQ to requeue the message.

/**
 * Reject one or several received messages.
 *
 * Supply the <code>deliveryTag</code> from the {@link com.rabbitmq.client.AMQP.Basic.GetOk}
 * or {@link com.rabbitmq.client.AMQP.Basic.GetOk} method containing the message to be rejected.
 * @see com.rabbitmq.client.AMQP.Basic.Nack
 * @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver}
 * @param multiple true to reject all messages up to and including
 * the supplied delivery tag; false to reject just the supplied
 * delivery tag.
 * @param requeue true if the rejected message(s) should be requeued rather
 * than discarded/dead-lettered
 * @throws java.io.IOException if an error is encountered
 */
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
        throws IOException;
Gary Russell
  • 166,535
  • 14
  • 146
  • 179
  • Thanks for your comment! Yes, I wanted to requeue the message, but I don't want the consumer consumes the requeued message again. How can we do that ? – Hai Pham Jun 25 '17 at 04:15
  • When you reject a message (with requeue=true), the broker will immediately redeliver it to an idle consumer. If you have many messages, you can increase the consumer prefetch count so it will be redelivered after any prefetched messages are processed; or you manually republish it to the tail of the queue as I discussed in [this answer](https://stackoverflow.com/questions/44704950/return-message-to-the-end-of-queue-in-rabbitmq/44706085#44706085). – Gary Russell Jun 25 '17 at 18:28
  • Got you, but the requeued message will also be consumed by the same consumer in bad case. – Hai Pham Jun 26 '17 at 10:57
  • I don't understand your issue; if you don't want a bad message redelivered then don't requeue it when rejecting it. – Gary Russell Jun 26 '17 at 12:35
  • Thanks a lot for your support! – Hai Pham Jun 27 '17 at 04:25
  • I am building a system in which I have several microservices. Other services will send messages to a service. For example, service A & B are sending messages to a service C concurrently, then C will send a response message to a queue where A & B are listening to. If a message is not the one A or B need, then the message will be requeued. The problem is that A or B will re-consume the rejected one. I think RabbitMQ is not able to resolve this situation. – Hai Pham Jun 27 '17 at 04:45
  • If you want the message redelivered after a delay, you can use a dead letter queue - reject the message, set a TTL on the dead letter queue and configure the dead letter queue to route expired messages back to the real queue (by configuring __it__ as the DLQ's dead-letter). You can check how many times it's been around the loop using the `x-death` header. See [this answer](https://stackoverflow.com/questions/27640358/spring-amqp-with-rabbitmq-message-is-not-circled-back-to-live-queue-after-falli/27641433#27641433), although the x-death header now has a count instead of growing indefinitely. – Gary Russell Jun 27 '17 at 12:50
  • My solution is that, a queue will be generated as a responded one for 2 services, so I don't care which message is the responed one for which service. Thanks a lot for your help. – Hai Pham Jun 27 '17 at 13:27