1

I am having 2 applications exchanging data using RabbitMQ. I have implemented this using Spring AMQP. I have scenario once the message has been consumed from the consumer might encounter an exception while processing.

If any exception comes i am planning to log into the database. I have to remove message from the queue explicitly once the message reaches the consumer whether it is successful processing or error encountered.

How to forcefully remove the message from queue otherwise it will be there if my application fails to process it?

Below is my Listener code

 @RabbitListener(containerFactory="rabbitListenerContainerFactory",queues=Constants.JOB_QUEUE)
            public void handleMessage(JobListenerDTO jobListenerDTO) {
                //System.out.println("Received summary: " + jobListenerDTO.getProcessXML());
                //amqpAdmin.purgeQueue(Constants.JOB_QUEUE, true);
                try{
                    Map<String, Object> variables = new HashMap<String, Object>();  
                    variables.put("initiator", "cmy5kor");

                    Deployment deploy = repositoryService.createDeployment().addString(jobListenerDTO.getProcessId()+".bpmn20.xml",jobListenerDTO.getProcessXML()).deploy();
                    ProcessInstance processInstance = runtimeService.startProcessInstanceByKey(jobListenerDTO.getProcessId(), variables);

                    System.out.println("Process Instance is:::::::::::::"+processInstance);

                }catch(Exception e){

                    e.printStackTrace();
            }

Configuration Code

@Configuration
@EnableRabbit
public class RabbitMQJobConfiguration extends AbstractBipRabbitConfiguration {


    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        template.setQueue(Constants.JOB_QUEUE);
        template.setMessageConverter(jsonMessageConverter());
        return template;
    }


    @Bean
    public Queue jobQueue() {
        return new Queue(Constants.JOB_QUEUE);
    }


    @Bean(name="rabbitListenerContainerFactory")
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter();
        DefaultClassMapper classMapper = new DefaultClassMapper();
        Map<String, Class<?>> idClassMapping = new HashMap<String, Class<?>>();
        idClassMapping.put("com.bosch.diff.approach.TaskMessage", JobListenerDTO.class);
        classMapper.setIdClassMapping(idClassMapping);
        messageConverter.setClassMapper(classMapper);
        factory.setMessageConverter(messageConverter);
        factory.setReceiveTimeout(10L);
        return factory;
    }



}
Chandan
  • 333
  • 1
  • 6
  • 14

2 Answers2

3

I don't know about spring api or configuration for rmq but this

 I have to remove message from the queue explicitly once the message reaches the consumer whether it is successful processing or error encountered.

is exactly what is happening when you set the auto-acknowledge flag. In that way, the message is acknowledged as soon as it's consumed - so gone from the queue.

cantSleepNow
  • 9,691
  • 5
  • 31
  • 42
1

As long as your listener catches the exception the message will be removed from the queue.

If your listener throws an exception, it will be requeued by default; that behavior can be modified by throwing a AmqpRejectAndDontRequeueException or setting the defaultRequeueRejected property - see the documentation for details.

Gary Russell
  • 166,535
  • 14
  • 146
  • 179
  • Thanks Gary. **Do we have some examples in github so that i can read and implement?** – Chandan Aug 29 '16 at 03:38
  • See [the samples](http://docs.spring.io/spring-amqp//reference/html/_reference.html#sample-apps). – Gary Russell Aug 29 '16 at 12:23
  • @GaryRussell, From what I know rabbitmq removes message from queue **after** getting ACK, yeah ? So when spring-boot sends ACK ? In case arbitrary (with except for `AmqpRejectAndDontRequeueException`) uncaught exception from listener ? Ok, but what in case of uncauht exception from my own converter (e.g parser exceptions) ? –  Mar 23 '17 at 10:01
  • See my answer to [your question](http://stackoverflow.com/questions/42964342/sending-acknowledgment-to-rabbitmq-server-in-depends-on-converter-and-listener/42975467#42975467). – Gary Russell Mar 23 '17 at 12:04
  • Hi, can you please update the link. It says invalid. – z atef Apr 16 '20 at 18:30