5

I am throwing an AmqpException inside of my consumer. My expectation is that the message will return back to the queue in FIFO order and will be reprocessed sometime in the future.

It seems as if Spring AMQP does not release the message back to the queue. But instead tries to reprocess the failed messages over and over again. This blocks the newly arrived messages from being processed. The ones that are stuck appear in the "unpacked" state forever inside of the AMQP console.

Any thoughts?

Gary Russell
  • 166,535
  • 14
  • 146
  • 179
Vladimir
  • 1,120
  • 2
  • 11
  • 18

2 Answers2

10

That's the way rabbitmq/Spring AMQP works; if a message is rejected (any exception is thrown) the message is requeued by default and is put back at the head of the queue so it is retried immediately.

... reprocessed sometime in the future.

You have to configure things appropriately to make that happen.

First, you have to tell the broker to NOT requeue the message. That is done by setting defaultRequeueRejected on the listener container to false (it's true by default). Or, you can throw an AmqpRejectAndDontRequeueException which instructs the container to reject (and not requeue) an individual message.

But that's not the end of it; just doing that will simply cause the rejected message to be discarded.

To avoid that, you have to set up a Dead Letter Exchange/Queue for the queue - rejected messages are then sent to the DLX/DLQ instead of being discarded. Using a policy rather than queue arguments is generally recommended.

Finally, you can set a message time to live on the the DLQ so, after that time, the message is removed from the queue. If you set up an another appropriate dead letter exchange on that queue (the DLQ), you can cause the message to be requeued back to the original queue after the time expires.

Note that this will only work for rejected deliveries from the original queue; it will not work when expiring messages in that queue.

See this answer and some of the links from its question for more details.

You can use the contents of the x-death header to decide if you should give up completely after some number of attempts (catch the exception and somehow dispose of the bad message; don't thrown an exception and the container will ack the message).

Community
  • 1
  • 1
Gary Russell
  • 166,535
  • 14
  • 146
  • 179
  • I was hoping to use the Spring AMQP RetryOperationsInterceptor, that would kick in when throwing the AmqpException. Once the max retry attempt is reached a "Recoverer" would throws an AmqpRejectAndDontRequeueException, that will discard the message. I figured in between the retry attempts by the retry interceptor (which could take a few hours with exponential backoff) the message would go back to the queue, allowing for newly arrived messages to be processed. – Vladimir Feb 06 '15 at 04:32
  • No; the retry interceptor simply blocks the thread before delivering it to the listener; it is only suited for short delays between retries; you need to use the broker as I described to defer redeliveries. There is no other way for the consumer to tell the broker to wait a while before redelivering; it's just not part of the amqp protocol. Even with JMS, such delays are proprietary to the broker and are not part of the jms API. – Gary Russell Feb 06 '15 at 04:41
  • Great - thanks for your feedback. Will reconsider my approach. – Vladimir Feb 06 '15 at 05:07
  • Nice solution, though it requires some setting up. I still would like dead messages not to vanish forever, but be available in a dead letter queue for inspection. – Andrea Ratto May 30 '15 at 09:12
  • You can use the `x-death` header to detect the number of cycles and after some number, re-publish the message to some other queue. – Gary Russell May 30 '15 at 11:48
1

Here is a solution I used to solve this. I setup an Interceptor to retry the message x number of times while applying a backoff policy. http://trippstech.blogspot.com/2016/03/rabbitmq-deadletter-queue-with.html

trippstowe
  • 11
  • 2