I have a consumer that consumes a message from a queue (manual ack), which will then rejected by calling Basic.Nack() or Basic.reject() method. After that, the consumer still consumes that rejected message.
How can I prevent the same consumer from consuming the rejected message? Instead, the rejected message should be redelivered to the different consumers?
UPDATE:
The message listener delegate
public class MessageListenerDelegate implements ChannelAwareMessageListener {
private MessageListener messageListener;
@Override
public void onMessage(Message message, Channel channel) throws Exception {
}
// setter and getter for MessageListener
....
}
public interface MessageListener {
void onMessage(MessageResourceHolder holder) throws Exception;
}
During app startup set up parallel consumers and start the consumer
public void listen(String queueName, MessageListener messageListener) {
SimpleMessageListenerContainer listener = new SimpleMessageListenerContainer(connectionFactory);
listener.setMessageListener(new MessageListenerAdapter(new MessageListenerDelegate(messageListener)));
listener.addQueueNames(queueName);
listener.setConcurrentConsumers(5);
listener.setAcknowledgeMode(AcknowledgeMode.MANUAL);
listener.start();
}
And now listen on a queue, then call Basic.Nack() to reject a consumed message
listen(queueName, new MessageListener() {
@Override
public void onMessage(MessageResourceHolder holder) throws IOException {
holder.getChannel().basicNack(holder.getMessageProperties().getDeliveryTag(), false, true);
}
});
The consumers are running in many threads, so it always consumes messages from the queue including the rejected messages