1

My Scenario: I publish two messages to my Rabbit broker, and an unhandled exception occurs while processing the first message.

My Question: Why does the message remain Unack'd in the broker and as a consequence why is the second message not be dequeued and processed?

Some info: I am using Spring AMQP 1.5.4 with Spring Integration 4.2.4. (See code below) I have a Dead Letter Exchange set up and it is working as expected (i.e. When I Nack a message, it is forwarded to the DLX where it expires. It is then forwarded to the main Exchange).

What I want: I would like unhandled exceptions (i.e. exceptions that are caught by the SimpleMessageListenerContainer) to result in the amqp-message being Nack'd rather than remaining Unack'd.

What I see: There are 3 retry attempts to process the message which of course fail because of my forced exception (see code below in ErrorHandler).

The consumer tag of the BlockingQueueConsumer is the same so I'm guessing that the BlockingQueueConsumer is not restarted. However, the logs below show that it does continue to wait for messages.

I would like to know why the BlockingQueueConsumer does not nack the message and why subsequent message are not consumed despite the evidence in the logs that the Consumer is waiting for messages.

Any suggestions or background info would be very welcome!

@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory, Queue mainQueue, RetryOperationsInterceptor retryOperationsInterceptor) {
    SimpleMessageListenerContainer retVal = new SimpleMessageListenerContainer(connectionFactory);
    retVal.addQueues(mainQueue);
    retVal.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    retVal.setDefaultRequeueRejected(false);
    retVal.setAdviceChain(new Advice[]{retryOperationsInterceptor});
    return retVal;
}

@Bean
public RetryOperationsInterceptor retryOperationsInterceptor () {
    return stateless().recoverer(new RejectAndDontRequeueRecoverer()).build();
}

<int-amqp:inbound-channel-adapter
    channel="fromRabbitChannel"
    error-channel="errorChannel"
    listener-container="simpleMessageListenerContainer"
    />

<int:service-activator ref="errorHandler" input-channel="errorChannel" method="handleError"/>

@MessageEndpoint
public class ErrorHandler {
    public void handleError(Message<MessagingException> message) throws IOException {
        throw new IllegalStateException("FORCED EXCEPTION");
    }
}

09:49:38.219 [SimpleAsyncTaskExecutor-1] INFO  c.p.a.f.ErrorHandler - Throwing an exception!!
09:49:38.219 [SimpleAsyncTaskExecutor-1] DEBUG o.s.retry.support.RetryTemplate - Checking for rethrow: count=3
09:49:38.219 [SimpleAsyncTaskExecutor-1] DEBUG o.s.retry.support.RetryTemplate - Retry failed last attempt: count=3
09:49:38.220 [SimpleAsyncTaskExecutor-1] WARN  o.s.a.r.r.RejectAndDontRequeueRecoverer - Retries exhausted for message (Body:'[B@c78ef32(byte[97])'MessageProperties [blah blah])
    org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:865) [spring-rabbit-1.5.2.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:760) [spring-rabbit-1.5.2.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:680) [spring-rabbit-1.5.2.RELEASE.jar:na]
....
....
09:49:38.221 [SimpleAsyncTaskExecutor-1] WARN  o.s.a.r.l.ConditionalRejectingErrorHandler - Execution of Rabbit message listener failed.
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Retry Policy Exhausted
at org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer.recover(RejectAndDontRequeueRecoverer.java:44) ~[spring-rabbit-1.5.2.RELEASE.jar:na]
at org.springframework.amqp.rabbit.config.StatelessRetryOperationsInterceptorFactoryBean$1.recover(StatelessRetryOperationsInterceptorFactoryBean.java:59) ~[spring-rabbit-1.5.2.RELEASE.jar:na]
at org.springframework.amqp.rabbit.config.StatelessRetryOperationsInterceptorFactoryBean$1.recover(StatelessRetryOperationsInterceptorFactoryBean.java:53) ~[spring-rabbit-1.5.2.RELEASE.jar:na]
at org.springframework.retry.interceptor.RetryOperationsInterceptor$ItemRecovererCallback.recover(RetryOperationsInterceptor.java:124) ~[spring-retry-1.1.2.RELEASE.jar:na]
at org.springframework.retry.support.RetryTemplate.handleRetryExhausted(RetryTemplate.java:458) ~[spring-retry-1.1.2.RELEASE.jar:na]
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:320) ~[spring-retry-1.1.2.RELEASE.jar:na]
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:168) ~[spring-retry-1.1.2.RELEASE.jar:na]
....
....
09:49:38.222 [SimpleAsyncTaskExecutor-1] DEBUG o.s.a.r.l.BlockingQueueConsumer - Retrieving delivery for Consumer: tags=[{amq.ctag-XVCBQNXxCMFERaF1kbeI3Q=debitCardStatusQueue}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5671/,1), acknowledgeMode=MANUAL local queue size=0
09:49:39.222 [SimpleAsyncTaskExecutor-1] DEBUG o.s.a.r.l.BlockingQueueConsumer - Retrieving delivery for Consumer: tags=[{amq.ctag-XVCBQNXxCMFERaF1kbeI3Q=debitCardStatusQueue}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5671/,1), acknowledgeMode=MANUAL local queue size=0
Rob O'Doherty
  • 549
  • 3
  • 14

1 Answers1

1

retVal.setAcknowledgeMode(AcknowledgeMode.MANUAL);

With manual acks, you are responsible to ack or reject the message; the container will only ack/nack if you set the mode to AUTO; it will then do exactly as you require.

Gary Russell
  • 166,535
  • 14
  • 146
  • 179
  • Thanks Gary ... That was all I needed to do. – Rob O'Doherty Mar 01 '16 at 09:34
  • @Gary, can you help here: http://stackoverflow.com/questions/42964342/sending-acknowledgment-to-rabbitmq-server-in-depends-on-converter-and-listener I read this thread, however spring-boot seems to be other approach for me –  Mar 23 '17 at 09:51