2

I'm using the RabbitMQ 3.6.10 UI to publish a message that is received by my Java application that uses Spring Integration AMQP 4.3.11. The message is a reply to an earlier message that was created using a Splitter, so it had a sequenceNumber and sequenceSize headers. I copy these headers to the reply and set them to the type Number in the RabbitMQ UI. However, on the Java side, I'm getting an exception:

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Message conversion failed
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:223)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:822)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:745)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:97)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:189)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1276)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:726)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1219)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1189)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1500(SimpleMessageListenerContainer.java:97)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1421)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: The 'sequenceNumber' header value must be an Integer.
    at org.springframework.util.Assert.isTrue(Assert.java:92)
    at org.springframework.integration.IntegrationMessageHeaderAccessor.verifyType(IntegrationMessageHeaderAccessor.java:143)
    at org.springframework.messaging.support.MessageHeaderAccessor.setHeader(MessageHeaderAccessor.java:298)
    at org.springframework.messaging.support.MessageHeaderAccessor.copyHeaders(MessageHeaderAccessor.java:389)
    at org.springframework.integration.support.MessageBuilder.copyHeaders(MessageBuilder.java:177)
    at org.springframework.integration.support.MessageBuilder.copyHeaders(MessageBuilder.java:47)
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.processMessage(AmqpInboundChannelAdapter.java:243)
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:203)
    ... 14 more

I checked that the types of the sequenceNumber and sequenceSize headers on the Java side are Long, instead of Integer. There is however no option to make this difference in the RabbitMQ UI. The messages will be sent by a non-Java application, so how do I make sure the headers get recognised as Integer by Spring Integration?

When I publish the reply using a Java client and set the header values to Integer, then the consumer accepts them. So this is probably a limitation of the RabbitMQ UI not having the header types specific enough (eg. 32-bit vs. 64-bit number) or Java client being too strict about the expected value type. Can anyone confirm one or the other?

Adam Michalik
  • 9,678
  • 13
  • 71
  • 102
  • What do you mean by "set them to type" if you say that you just propagate them? Are you testing something manually, e.g. the reply case? – Artem Bilan Sep 26 '17 at 10:51
  • @ArtemBilan - yes, I'm testing the reply case manually via the RabbitMQ UI. There the only possible header types are: String, Number, Boolean, List. By "propagate" I meant here "copying the value manually". – Adam Michalik Sep 26 '17 at 11:08
  • 1
    I think we should be more lenient; see my answer for a work-around. – Gary Russell Sep 26 '17 at 14:01

1 Answers1

1

Add a MessagePostProcessor to the adapter's listener container...

@Bean
public AmqpInboundChannelAdapter adapter(ConnectionFactory cf) {
    AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(listenerContainer(cf));
    adapter.setOutputChannelName("someChannel");
    return adapter;
}

@Bean
public AbstractMessageListenerContainer listenerContainer(ConnectionFactory cf) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cf);
    container.setQueueNames("foo");
    container.setAfterReceivePostProcessors(m -> {
        if (m.getMessageProperties().getHeaders()
                .get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER) instanceof Long) {
            Integer sequenceNumber = ((Long) m.getMessageProperties().getHeaders()
                    .get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER)).intValue();
            m.getMessageProperties().getHeaders().put(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER,
                    sequenceNumber);
        }
        return m;
    });
    return container;
}

Please open a JIRA Issue - we should probably be more lenient, especially if the value is < Integer.MAX_VALUE.

Gary Russell
  • 166,535
  • 14
  • 146
  • 179
  • Yes, you can do the same using `HeaderEnricher` before `Aggregator`. I mean remap `IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER` to itself but using `.intValue()` and `overwrite=true` option. – Artem Bilan Sep 26 '17 at 14:01
  • @Artem - the failure is in the channel adapter `at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:223)` – Gary Russell Sep 26 '17 at 14:03
  • Doh... Indeed `IntegrationMessageHeaderAccessor.verifyType()` is restricted to only `Integer` on the matter. Not my day today... Sorry for the noise. – Artem Bilan Sep 26 '17 at 14:06
  • 1
    Thanks, Gary. I created [INT-4349](https://jira.spring.io/browse/INT-4349). For now I'll use a simple Java publisher for testing - this one sets the header types in a compatible way. – Adam Michalik Sep 26 '17 at 14:24