My Scenario: I publish two messages to my Rabbit broker, and an unhandled exception occurs while processing the first message.
My Question: Why does the message remain Unack'd in the broker and as a consequence why is the second message not be dequeued and processed?
Some info: I am using Spring AMQP 1.5.4 with Spring Integration 4.2.4. (See code below) I have a Dead Letter Exchange set up and it is working as expected (i.e. When I Nack a message, it is forwarded to the DLX where it expires. It is then forwarded to the main Exchange).
What I want:
I would like unhandled exceptions (i.e. exceptions that are caught by the SimpleMessageListenerContainer
) to result in the amqp-message being Nack'd rather than remaining Unack'd.
What I see:
There are 3 retry attempts to process the message which of course fail because of my forced exception (see code below in ErrorHandler
).
The consumer tag
of the BlockingQueueConsumer
is the same so I'm guessing that the BlockingQueueConsumer is not restarted. However, the logs below show that it does continue to wait for messages.
I would like to know why the BlockingQueueConsumer does not nack the message and why subsequent message are not consumed despite the evidence in the logs that the Consumer is waiting for messages.
Any suggestions or background info would be very welcome!
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory, Queue mainQueue, RetryOperationsInterceptor retryOperationsInterceptor) {
SimpleMessageListenerContainer retVal = new SimpleMessageListenerContainer(connectionFactory);
retVal.addQueues(mainQueue);
retVal.setAcknowledgeMode(AcknowledgeMode.MANUAL);
retVal.setDefaultRequeueRejected(false);
retVal.setAdviceChain(new Advice[]{retryOperationsInterceptor});
return retVal;
}
@Bean
public RetryOperationsInterceptor retryOperationsInterceptor () {
return stateless().recoverer(new RejectAndDontRequeueRecoverer()).build();
}
<int-amqp:inbound-channel-adapter
channel="fromRabbitChannel"
error-channel="errorChannel"
listener-container="simpleMessageListenerContainer"
/>
<int:service-activator ref="errorHandler" input-channel="errorChannel" method="handleError"/>
@MessageEndpoint
public class ErrorHandler {
public void handleError(Message<MessagingException> message) throws IOException {
throw new IllegalStateException("FORCED EXCEPTION");
}
}
09:49:38.219 [SimpleAsyncTaskExecutor-1] INFO c.p.a.f.ErrorHandler - Throwing an exception!!
09:49:38.219 [SimpleAsyncTaskExecutor-1] DEBUG o.s.retry.support.RetryTemplate - Checking for rethrow: count=3
09:49:38.219 [SimpleAsyncTaskExecutor-1] DEBUG o.s.retry.support.RetryTemplate - Retry failed last attempt: count=3
09:49:38.220 [SimpleAsyncTaskExecutor-1] WARN o.s.a.r.r.RejectAndDontRequeueRecoverer - Retries exhausted for message (Body:'[B@c78ef32(byte[97])'MessageProperties [blah blah])
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:865) [spring-rabbit-1.5.2.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:760) [spring-rabbit-1.5.2.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:680) [spring-rabbit-1.5.2.RELEASE.jar:na]
....
....
09:49:38.221 [SimpleAsyncTaskExecutor-1] WARN o.s.a.r.l.ConditionalRejectingErrorHandler - Execution of Rabbit message listener failed.
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Retry Policy Exhausted
at org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer.recover(RejectAndDontRequeueRecoverer.java:44) ~[spring-rabbit-1.5.2.RELEASE.jar:na]
at org.springframework.amqp.rabbit.config.StatelessRetryOperationsInterceptorFactoryBean$1.recover(StatelessRetryOperationsInterceptorFactoryBean.java:59) ~[spring-rabbit-1.5.2.RELEASE.jar:na]
at org.springframework.amqp.rabbit.config.StatelessRetryOperationsInterceptorFactoryBean$1.recover(StatelessRetryOperationsInterceptorFactoryBean.java:53) ~[spring-rabbit-1.5.2.RELEASE.jar:na]
at org.springframework.retry.interceptor.RetryOperationsInterceptor$ItemRecovererCallback.recover(RetryOperationsInterceptor.java:124) ~[spring-retry-1.1.2.RELEASE.jar:na]
at org.springframework.retry.support.RetryTemplate.handleRetryExhausted(RetryTemplate.java:458) ~[spring-retry-1.1.2.RELEASE.jar:na]
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:320) ~[spring-retry-1.1.2.RELEASE.jar:na]
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:168) ~[spring-retry-1.1.2.RELEASE.jar:na]
....
....
09:49:38.222 [SimpleAsyncTaskExecutor-1] DEBUG o.s.a.r.l.BlockingQueueConsumer - Retrieving delivery for Consumer: tags=[{amq.ctag-XVCBQNXxCMFERaF1kbeI3Q=debitCardStatusQueue}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5671/,1), acknowledgeMode=MANUAL local queue size=0
09:49:39.222 [SimpleAsyncTaskExecutor-1] DEBUG o.s.a.r.l.BlockingQueueConsumer - Retrieving delivery for Consumer: tags=[{amq.ctag-XVCBQNXxCMFERaF1kbeI3Q=debitCardStatusQueue}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5671/,1), acknowledgeMode=MANUAL local queue size=0