30

How to acknowledge the messages manually without using auto acknowledgement. Is there a way to use this along with the @RabbitListener and @EnableRabbit style of configuration. Most of the documentation tells us to use SimpleMessageListenerContainer along with ChannelAwareMessageListener. However using that we lose the flexibility that is provided with the annotations. I have configured my service as below :

@Service
public class EventReceiver {

@Autowired
private MessageSender messageSender;

@RabbitListener(queues = "${eventqueue}")
public void receiveMessage(Order order) throws Exception {

  // code for processing order
}

My RabbitConfiguration is as below

@EnableRabbit
public class RabbitApplication implements RabbitListenerConfigurer {

public static void main(String[] args) {
    SpringApplication.run(RabbitApplication.class, args);
}

@Bean


public MappingJackson2MessageConverter jackson2Converter() {
        MappingJackson2MessageConverter converter = new  MappingJackson2MessageConverter();
        return converter;
    @Bean
public SimpleRabbitListenerContainerFactory myRabbitListenerContainerFactory() {
      SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
      factory.setConnectionFactory(rabbitConnectionFactory());
      factory.setMaxConcurrentConsumers(5);
      factory.setMessageConverter((MessageConverter) jackson2Converter());
      factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
      return factory;
    }

@Bean
public ConnectionFactory rabbitConnectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
    connectionFactory.setHost("localhost");
    return connectionFactory;
}

@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
    registrar.setContainerFactory(myRabbitListenerContainerFactory());
}

@Autowired
private EventReceiver receiver;
}
}

Any help will be appreciated on how to adapt manual channel acknowledgement along with the above style of configuration. If we implement the ChannelAwareMessageListener then the onMessage signature will change. Can we implement ChannelAwareMessageListener on a service ?

Gary Russell
  • 166,535
  • 14
  • 146
  • 179
Gururaj Nayak
  • 636
  • 1
  • 9
  • 18
  • 2
    One question is why you even need to do this. If your code is like in your answer below, (reject on failure, ack otherwise), the container will do that automatically for you with AUTO ack mode - if the listener throws an exception the message will be rejected; otherwise acked. – Gary Russell Aug 08 '16 at 19:19

3 Answers3

47

Add the Channel to the @RabbitListener method...

@RabbitListener(queues = "${eventqueue}")
public void receiveMessage(Order order, Channel channel,
    @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
    ...
}

and use the tag in the basicAck, basicReject.

EDIT

@SpringBootApplication
@EnableRabbit
public class So38728668Application {

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(So38728668Application.class, args);
        context.getBean(RabbitTemplate.class).convertAndSend("", "so38728668", "foo");
        context.getBean(Listener.class).latch.await(60, TimeUnit.SECONDS);
        context.close();
    }

    @Bean
    public Queue so38728668() {
        return new Queue("so38728668");
    }

    @Bean
    public Listener listener() {
        return new Listener();
    }

    public static class Listener {

        private final CountDownLatch latch = new CountDownLatch(1);

        @RabbitListener(queues = "so38728668")
        public void receive(String payload, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag)
                throws IOException {
            System.out.println(payload);
            channel.basicAck(tag, false);
            latch.countDown();
        }

    }

}

application.properties:

spring.rabbitmq.listener.acknowledge-mode=manual
gstackoverflow
  • 36,709
  • 117
  • 359
  • 710
Gary Russell
  • 166,535
  • 14
  • 146
  • 179
  • 1
    Thanks for the suggestion and we tried your suggestion and put in the line `channel.basicAck('100001', false)`. Now irrespective of whether i put "true" or "false" in the above line of code the listener and queue go in infinite loop. So can you help me how to get around this. – Gururaj Nayak Aug 03 '16 at 15:42
  • We finally solved the issue. Iam documenting this for the benefit of others. – Gururaj Nayak Aug 04 '16 at 18:36
  • I got an error that spring.rabbitmq.listener.acknowledge-mode is a deprecated property. I ended up setting this property on my RabbitListenerContainerFactory Bean and got it working that way. – Elaina Feb 15 '18 at 17:54
  • 17
    The property was [moved to `spring.rabbitmq.listener.simple.acknowledge-mode`](https://github.com/spring-projects/spring-boot/blob/1.5.x/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java#L484). In Spring Boot 2.0 it can be `spring.rabbitmq.listener.simple.acknowledge-mode` or `spring.rabbitmq.listener.direct.acknowledge-mode` because Spring AMQP now supports 2 container types. See [the documentation](https://docs.spring.io/spring-boot/docs/2.0.0.RC1/reference/html/common-application-properties.html). – Gary Russell Feb 15 '18 at 18:39
  • 14
    Could someone explain to me why java programmers refuse to put the imports in their code examples? I feel like it would save me hours. – Lynch Apr 06 '18 at 15:43
  • Too much noise - if you use an IDE (e.g. eclipse/Intellij) it will make suggestions and it's usually pretty obvious if there are multiple matches. This particular example is in my [sandbox github repo](https://github.com/garyrussell/sandbox/blob/master/so38728668/src/main/java/com/example/So38728668Application.java). – Gary Russell Apr 06 '18 at 15:59
  • The info how to use the application properties to set acknowledge-mode to manual is very important! One expects to configure this in the parameters of the @RabbitListener annotation. – Datz Apr 30 '18 at 05:25
16

Just in case you need to use #onMessage() from ChannelAwareMessageListener class. Then you can do it this way.

@Component
public class MyMessageListener implements ChannelAwareMessageListener {

    @Override
    public void onMessage(Message message, Channel channel) {
        log.info("Message received.");
        // do something with the message
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

And for the rabbitConfiguration

@Configuration
public class RabbitConfig {

    public static final String topicExchangeName = "exchange1";

    public static final String queueName = "queue1";

    public static final String routingKey = "queue1.route.#";

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
        connectionFactory.setUsername("xxxx");
        connectionFactory.setPassword("xxxxxxxxxx");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("vHost1");
        return connectionFactory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        return new RabbitTemplate(connectionFactory());
    }

    @Bean
    Queue queue() {
        return new Queue(queueName, true);
    }

    @Bean
    TopicExchange exchange() {
        return new TopicExchange(topicExchangeName);
    }

    @Bean
    Binding binding(Queue queue, TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(routingKey);
    }


    @Bean
    public SimpleMessageListenerContainer listenerContainer(MyMessageListener myRabbitMessageListener) {
        SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer();
        listenerContainer.setConnectionFactory(connectionFactory());
        listenerContainer.setQueueNames(queueName);
        listenerContainer.setMessageListener(myRabbitMessageListener);
        listenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        listenerContainer.setConcurrency("4");
        listenerContainer.setPrefetchCount(20);
        return listenerContainer;
    }
}
Pang
  • 9,564
  • 146
  • 81
  • 122
Parisana Ng
  • 487
  • 4
  • 14
-2

Thanks for gary's help. I finally solved the issue. I am documenting this for the benefit of others. This needs to be documented as part of standard documentation in Spring AMQP reference documentation page. Service class is as below.

   @Service
    public class Consumer {

    @RabbitListener(queues = "${eventqueue}")
    public void receiveMessage(Order order, Channel channel) throws Exception {



 // the above methodname can be anything but should have channel as second signature

    channel.basicConsume(eventQueue, false, channel.getDefaultConsumer()); 
    // Get the delivery tag
    long deliveryTag = channel.basicGet(eventQueue, false).getEnvelope().getDeliveryTag();
    try {

      // code for processing order

    catch(Exception) {
     // handle exception
        channel.basicReject(deliveryTag, true);
    }
    // If all logic is successful 
    channel.basicAck(deliveryTag, false);
}

the configuration has also been modified as below

public class RabbitApplication implements RabbitListenerConfigurer {

    private static final Logger log = LoggerFactory.getLogger(RabbitApplication .class);

    public static void main(String[] args) {
        SpringApplication.run(RabbitApplication.class, args);
    }

    @Bean
    public MappingJackson2MessageConverter jackson2Converter() {
        MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
        return converter;
    }

    @Bean
    public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
        DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
        factory.setMessageConverter(jackson2Converter());
        return factory;
    }

    @Autowired
    private Consumer consumer;

    @Override
    public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
        registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
    }

      ...
}

Note: no need to configure Rabbitconnectionfactory or containerfactor etc since the annotation implicity take care of all this.

Gururaj Nayak
  • 636
  • 1
  • 9
  • 18
  • 1
    NO - you should NOT issue a `basicConsume` or `basicGet` against the channel - `basicGet` will fetch another message. The listener container is already consuming from it and the message being used to invoke the method has a different delivery tag. Instead, use `@Header(AmqpHeaders.DELIVERY_TAG) long tag`. See my answer (edit). – Gary Russell Aug 04 '16 at 20:43
  • Gary, removed basicConsume and basicGet and used the @Header(AmqpHeaders.DELIVERY_TAG) long tag in basicAck/ basicReject The flow stopped working. Queue is fetched again and again in an infinite loop and the destination queue is populated. Reverted the code back to basicGet and basicConsume and it is working. – Gururaj Nayak Aug 08 '16 at 15:00
  • But it's NOT working - you are acking (and dropping) the next message. – Gary Russell Aug 08 '16 at 15:14
  • so you mean i should use some sort of consume to avoid it. It is not clear since you said earlier that we should not be using basicConsume. – Gururaj Nayak Aug 08 '16 at 18:10
  • The listener container __is__ the consumer - `getDefaultConsumer()` does nothing in this case (returns `null`) and `basicGet` fetches the next message in the queue. – Gary Russell Aug 08 '16 at 18:22
  • 4
    I just wrote a quick Spring Boot app and it works exactly as I described - I edited my answer with the code. The full project is [here](https://github.com/garyrussell/sandbox/tree/master/so38728668) and the [commit](https://github.com/garyrussell/sandbox/commit/7ff094bd02339538e6f4f8903628d12b1c1a36c0). If you set a breakpoint on `basicAck` you can see the un-acked message in the Rabbit Admin UI; step over, and the message is acked. – Gary Russell Aug 08 '16 at 18:49