12

I am facing an issue in receiving a message from RabbitMQ. I am sending a message like below

        HashMap<Object, Object> senderMap=new HashMap<>();
        senderMap.put("STATUS", "SUCCESS");
        senderMap.put("EXECUTION_START_TIME", new Date());

        rabbitTemplate.convertAndSend(Constants.ADAPTOR_OP_QUEUE,senderMap);

If we see in RabbitMQ, we will get a fully qualified type.

In the current scenario, we have n number of producer for the same consumer. If i use any mapper, it leads to an exception. How will i send a message so that it doesn't contain any type_id and i can receive the message as Message object and later i can bind it to my custom object in the receiver.

I am receiving message like below. Could you please let me know how to use Jackson2MessageConverter so that message will get directly binds to my Object/HashMap from Receiver end. Also i have removed the Type_ID now from the sender.

How Message looks in RabbitMQ

priority: 0 delivery_mode: 2 headers:
ContentTypeId: java.lang.Object KeyTypeId: java.lang.Object content_encoding: UTF-8 content_type: application/json {"Execution_start_time":1473747183636,"status":"SUCCESS"}

@Component
public class AdapterOutputHandler {

    private static Logger logger = Logger.getLogger(AdapterOutputHandler.class);

    @RabbitListener(containerFactory="adapterOPListenerContainerFactory",queues=Constants.ADAPTOR_OP_QUEUE)
    public void handleAdapterQueueMessage(HashMap<String,Object> message){

        System.out.println("Receiver:::::::::::"+message.toString());

    }

}

Connection

@Bean(name="adapterOPListenerContainerFactory")
    public SimpleRabbitListenerContainerFactory adapterOPListenerContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter();
        DefaultClassMapper classMapper = new DefaultClassMapper();
        messageConverter.setClassMapper(classMapper);
        factory.setMessageConverter(messageConverter);

    }

Exception

Caused by: org.springframework.amqp.support.converter.MessageConversionException: failed to convert Message content. Could not resolve __TypeId__ in header and no defaultType provided
    at org.springframework.amqp.support.converter.DefaultClassMapper.toClass(DefaultClassMapper.java:139)

I don't want to use __TYPE__ID from sender because they are multiple senders for the same queue and only one consumer.

BIndu_Madhav
  • 577
  • 1
  • 8
  • 21
  • *it leads to an exception* is not enough Informations. Add the stacktrace please – Jens Sep 12 '16 at 05:36
  • Actually headers in rabbitmq contains a property called type_id_. This shouldn't be. **How to send a message in which type_id_ property is not present** `priority: 0 delivery_mode: 2 __TypeId__: com.diff.approach.JobListenerDTO** content_encoding: UTF-8 content_type: application/json` – BIndu_Madhav Sep 12 '16 at 09:28

2 Answers2

7

it leads to an exception

What exception?

TypeId: com.diff.approach.JobListenerDTO

That means you are sending a DTO, not a hash map as you describe in the question.

If you want to remove the typeId header, you can use a message post processor...

rabbitTemplate.convertAndSend(Constants.INPUT_QUEUE, dto, m -> {
    m.getMessageProperties.getHeaders().remove("__TypeId__");
    return m;
});

(or , new MessagePostProcessor() {...} if you're not using Java 8).

EDIT

What version of Spring AMQP are you using? With 1.6 you don't even have to remove the __TypeId__ header - the framework looks at the listener parameter type and tells the Jackson converter the type so it automatically converts to that (if it can). As you can see here; it works fine without removing the type id...

package com.example;

import java.util.HashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
public class So39443850Application {

    private static final String QUEUE = "so39443850";

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(So39443850Application.class, args);
        context.getBean(RabbitTemplate.class).convertAndSend(QUEUE, new DTO("baz", "qux"));
        context.getBean(So39443850Application.class).latch.await(10, TimeUnit.SECONDS);
        context.getBean(RabbitAdmin.class).deleteQueue(QUEUE);
        context.close();
    }

    private final CountDownLatch latch = new CountDownLatch(1);

    @RabbitListener(queues = QUEUE, containerFactory = "adapterOPListenerContainerFactory")
    public void listen(HashMap<String, Object> message) {
        System.out.println(message.getClass() + ":" + message);
        latch.countDown();
    }

    @Bean
    public Queue queue() {
        return new Queue(QUEUE);
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setMessageConverter(new Jackson2JsonMessageConverter());
        return template;
    }

    @Bean
    public SimpleRabbitListenerContainerFactory adapterOPListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        return factory;
    }

    public static class DTO {

        private String foo;

        private String baz;

        public DTO(String foo, String baz) {
            this.foo = foo;
            this.baz = baz;
        }

        public String getFoo() {
            return this.foo;
        }

        public void setFoo(String foo) {
            this.foo = foo;
        }

        public String getBaz() {
            return this.baz;
        }

        public void setBaz(String baz) {
            this.baz = baz;
        }

    }

}

Result:

class java.util.HashMap:{foo=baz, baz=qux}

This is described in the documentation...

In versions prior to 1.6, the type information to convert the JSON had to be provided in message headers, or a custom ClassMapper was required. Starting with version 1.6, if there are no type information headers, the type can be inferred from the target method arguments.

You can also configure a custom ClassMapper to always return HashMap.

Gary Russell
  • 166,535
  • 14
  • 146
  • 179
  • Thanks Gary, One more question, How to receive this message ? `headers: __ContentTypeId__: java.lang.Object __KeyTypeId__: java.lang.Object content_encoding: UTF-8 content_type: application/json` – BIndu_Madhav Sep 13 '16 at 05:11
  • I am receiving the message like below `@RabbitListener(containerFactory="adapterOPListenerContainerFactory",queues=Constants.ADAPTOR_OP_QUEUE) public void handleAdapterQueueMessage(Message message){ byte[] body = message.getBody(); }` How to convert from byte[] to hashmap back ?? – BIndu_Madhav Sep 13 '16 at 07:59
  • Don't put code in comments - it's unreadable - edit your question instead. You need a `Jackson2JsonMessageConverter` in the listener container factory. – Gary Russell Sep 13 '16 at 12:05
  • Hello Gary, I have added my receiver code, could you please suggest how to use `Jackson2JsonMessageConverter` – BIndu_Madhav Sep 14 '16 at 03:43
  • Version of Spring AMQP - `1.6.1.RELEASE` Version of Spring RabbitMQ - `1.5.6.RELEASE` – BIndu_Madhav Sep 15 '16 at 04:30
  • Why are you using mismatched versions? That won't work; they need to be the same version. If you are using maven or gradle, you only need to declare `spring-rabbit` and the corresponding version of `spring-amqp` will be pulled in automatically (transitively). Regardless, you should always use the same versions for both jars; the current version is 1.6.2.RELEASE - see [the project page](http://projects.spring.io/spring-amqp/). – Gary Russell Sep 15 '16 at 12:23
  • Yes Gary. I used the same versions for both now. It worked. Thanks! – BIndu_Madhav Sep 15 '16 at 14:47
0
  • Want to use "a" different Java calss when receive message?

    Config @Bean Jackson2JsonMessageConverter with a custom ClassMapper

  • Want to use "many" different Java calss when receive message? such as :

    @MyAmqpMsgListener
    void handlerMsg(
            // Main message class, by MessageConverter
            @Payload MyMsg myMsg, 
    
            // Secondary message class - by MessageConverter->ConversionService
            @Payload Map<String, String> map,
    
            org.springframework.messaging.Message<MyMsg> msg,
            org.springframework.amqp.core.Message amqpMsg
    ) {
        // ...
    }
    

    Provide a custom @Bean Converter, ConversionServiceRabbitListenerAnnotationBeanPostProcessor :

    @Bean
    FormattingConversionServiceFactoryBean rabbitMqCs(
            Set<Converter> converters
    ) {
        FormattingConversionServiceFactoryBean fac = new FormattingConversionServiceFactoryBean();
        fac.setConverters(converters);
        return fac;
    }
    @Bean
    DefaultMessageHandlerMethodFactory messageHandlerMethodFactory(
            @Qualifier("rabbitMqCs")
            FormattingConversionService rabbitMqCs
    ) {
        DefaultMessageHandlerMethodFactory defaultFactory = new DefaultMessageHandlerMethodFactory();
        defaultFactory.setConversionService(rabbitMqCs);
        return defaultFactory;
    }
    
    // copied from RabbitBootstrapConfiguration
    @Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
    @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    public RabbitListenerAnnotationBeanPostProcessor rabbitListenerAnnotationProcessor(
            MessageHandlerMethodFactory handlerFac
    ) {
        RabbitListenerAnnotationBeanPostProcessor bpp = new RabbitListenerAnnotationBeanPostProcessor();
        bpp.setMessageHandlerMethodFactory(handlerFac);
        return bpp;
    }
    
    @Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
    public RabbitListenerEndpointRegistry defaultRabbitListenerEndpointRegistry() {
        return new RabbitListenerEndpointRegistry();
    }
    

References:

btpka3
  • 3,720
  • 2
  • 23
  • 26