2

For stream-based services, I want the message to remain in the queue when the underlying service invoked within a @StreamListener fails. To that end, my understanding is that the only way to do this is to configure spring.cloud.stream.bindings.channel_name.consumer.acknowledge-mode=MANUAL.

After making this configuration change, I tried adding @Header(AmqpHeaders.CHANNEL) Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) Long deliveryTag as method arguments to my existing @StreamListener implementation as documened at https://docs.spring.io/spring-integration/reference/html/amqp.html#amqp-inbound-ack. With this code in place, I encountered the following exception:

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:941)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:851)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:771)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:102)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:198)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1311)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:752)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1254)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1224)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:102)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1470)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.messaging.MessageHandlingException: Missing header 'amqp_channel' for method parameter type [interface com.rabbitmq.client.Channel]
    at org.springframework.messaging.handler.annotation.support.HeaderMethodArgumentResolver.handleMissingValue(HeaderMethodArgumentResolver.java:100)
    at org.springframework.messaging.handler.annotation.support.AbstractNamedValueMethodArgumentResolver.resolveArgument(AbstractNamedValueMethodArgumentResolver.java:103)
    at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:112)

I then found the following: https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/#_usage_examples, which shows an example of how to perform acknowledgement of messages using Kafka, but I am currently using the RabbitMQ binding. We plan on eventually moving to Kafka, but for now, how do I configure and code a solution to do manual message acknowledgement for successfully processed messages and manual message rejection, thus leaving the message on the queue, when exceptions are encountered. I am currently on Spring Cloud Edgware.RELEASE and Spring Cloud Stream Ditmars.RELEASE.

UPDATE

Now I have the following configuration:

spring:
  cloud:
    stream:
      bindings:
        do-something-async-reply:
          group: xyz-service-do-something-async-reply
      rabbit:
        bindings:
          do-something-async-reply:
            consumer:
              autoBindDlq: true
              dlqDeadLetterExchange:
              dlqTtl: 10000
              requeueRejected: true

And I'm receiving the following error at service startup:

2018-01-12 14:46:34.346 ERROR [xyz-service,,,] 2488 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory       : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-exchange' for queue 'do-something-async-reply.xyz-service-do-something-async-reply' in vhost '/': received the value 'DLX' of type 'longstr' but current is none, class-id=50, method-id=10)

What configuration is wrong/am I missing?

Keith Bennett
  • 733
  • 11
  • 25

1 Answers1

2

The property name is incorrect; you are missing .rabbit. It's

spring.cloud.stream.rabbit.bindings.<channel>.consumer.acknowledge-mode=MANUAL

since this is a rabbit-specific property - see the documentation.

EDIT

Example:

@SpringBootApplication
@EnableBinding(Sink.class)
public class So481977082Application {

    public static void main(String[] args) {
        SpringApplication.run(So481977082Application.class, args);
    }

    @StreamListener(Sink.INPUT)
    public void in(String in, @Header(AmqpHeaders.CHANNEL) Channel channel,
            @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
        System.out.println(in);
        Thread.sleep(60_000);
        channel.basicAck(tag, false);
        System.out.println("Ackd");
    }

}

Bear in mind that the need for MANUAL acks is often a smell; it's generally better to let the container handle the acks; see requeueRejected at the same doco link. Unconditionally requeueing can cause an infinite loop.

EDIT2

Works fine for me...

@SpringBootApplication
@EnableBinding(Processor.class)
public class So48197708Application {

    public static void main(String[] args) {
        SpringApplication.run(So48197708Application.class, args);
    }

    @Bean
    ApplicationRunner runner(MessageChannel output) {
        return args -> {
            output.send(new GenericMessage<>("foo"));
        };
    }

    @StreamListener(Sink.INPUT)
    public void listen(@Header(name = "x-death", required = false) List<?> death) {
        System.out.println(death);
        throw new RuntimeException("x");
    }

}

with

spring:
  cloud:
    stream:
      bindings:
        input:
          group: foo
          content-type: application/json
          destination: foo
          consumer:
            max-attempts: 1
        output:
          content-type: application/json
          destination: foo
      rabbit:
        bindings:
          input:
            consumer:
              auto-bind-dlq: true
              dlqDeadLetterExchange:
              dlqTtl: 10000

Result:

null
...
Caused by: java.lang.RuntimeException: x
...
[{reason=expired, count=1, exchange=DLX, routing-keys=[foo.foo], time=Fri Jan 12 17:20:28 EST 2018, queue=foo.foo.dlq}, 
    {reason=rejected, count=1, exchange=foo, time=Fri Jan 12 17:20:18 EST 2018, routing-keys=[foo], queue=foo.foo}]
...

...
[{reason=expired, count=3, exchange=DLX, routing-keys=[foo.foo], time=Fri Jan 12 17:20:28 EST 2018, queue=foo.foo.dlq}, 
    {reason=rejected, count=3, exchange=foo, routing-keys=[foo], time=Fri Jan 12 17:20:18 EST 2018, queue=foo.foo}]
Gary Russell
  • 166,535
  • 14
  • 146
  • 179
  • Gary, with requeueRejected set to true, will the message automatically get requeued when an exception is thrown from the `@StreamListener`? Is that all that needs to be done? – Keith Bennett Jan 11 '18 at 16:47
  • OK, I found some information about this at https://docs.spring.io/spring-cloud-stream/docs/Ditmars.SR2/reference/htmlsingle/#_retry_with_the_rabbitmq_binder. When the documentation states "Set the dlqDeadLetterExchange to the default exchange," what is the value for the default exchange? – Keith Bennett Jan 11 '18 at 17:26
  • 1
    Any exception except an `AmqpRejectAndDontRequeueException` will cause the message to be requeued, as long as retry is disabled. If retry is enabled, the binder will throw that exception when retries are exhausted. There is also `republishToDlq` where the binder will publish the failure, including headers containing information about the failure. The default exchange is an empty string. All queues are bound to that exchange with their queue name as the routing key. – Gary Russell Jan 11 '18 at 20:18
  • OK, I believe I have my configuration correctly defined according to https://docs.spring.io/spring-cloud-stream/docs/Ditmars.SR2/reference/htmlsingle/#_retry_with_the_rabbitmq_binder, but I'm receiving an error documented in the UPDATE to my original post. I've tried making some changes/additions, but I can't seem to figure out what I've specified incorrectly. – Keith Bennett Jan 12 '18 at 20:55
  • 1
    As it shows in [Putting it all together](https://docs.spring.io/spring-cloud-stream/docs/Ditmars.SR2/reference/htmlsingle/#_putting_it_all_together) you just need to use a blank value for the `dlqDeadLetterExchange` not ''. – Gary Russell Jan 12 '18 at 21:23
  • I removed the '' as reflected in the UPDATE, and I'm still getting the same error. I just added the `spring.cloud.stream.bindings...` configuration for reference purposes to my UPDATE as well. The error is stating that the problem is with the `group` channel (`xyz-service-do-something-async-reply`), which I want to send the message to. The configuration for that channel is in xyz-service.yml. – Keith Bennett Jan 12 '18 at 22:05
  • 1
    See my second edit for a working example. Since your queue exists already you have to delete it; you can't change arguments on an existing queue. – Gary Russell Jan 12 '18 at 22:26
  • Thanks for the example. I will take a look at it. There appears to be some sort of chicken/egg issue for me. After deleting the queue, the error no longer displays when starting the consumer service. However, when I start the producer service, that same error is displayed. The same queue name is referenced in the producer as follows: `spring.cloud.stream.bindings.do-something-async-reply.producer.required-groups: xyz-service-do-something-async-reply`. What's interesting is on the consumer side I was seeing `received the value 'DLX' of type 'longstr' but current is none` and now the opposite. – Keith Bennett Jan 12 '18 at 22:45
  • Now I'm seeing on the producer side `received none but current is the value 'DLX' of type 'longstr'`. – Keith Bennett Jan 12 '18 at 22:46
  • Gary, I just figured out that I need the following configurations on the producer side: `spring.cloud.stream.rabbit.bindings.do-something-async-reply.producer.auto-bind-dql:true`, `spring.cloud.stream.rabbit.bindings.do-something-async-reply.producer.dlq-dead-letter-exchange:`, and `spring.cloud.stream.rabbit.bindings.do-something-async-reply.producer.dlq.ttl: 10000`. No errors on either the consumer or producer side now. Thanks for your help! – Keith Bennett Jan 12 '18 at 23:08
  • Yes, if you have `required-groups` on the producer side (which forces consumer queue declaration), the queue binding configuration must match the consumer's configuration. It's generally best to decouple the consumer from the producer but `required-groups` is necessary if you might deploy a new producer before a new consumer. – Gary Russell Jan 13 '18 at 15:53