1

I am dealing with a message ordering issue, after having fixed it a while ago, now the fix does not work anymore.

Just for overview, I have the following environment: enter image description here

The order is lost somewhere between tcpAdapter and the message receiver.

This issue I have fixed using:

  1. on the producer side - using publisher confirms and returns
  rabbitmq:
    publisher-confirms: true
    publisher-returns: true
  1. on the consumer side - enforcing single thread executor: The idea I found here: RabbitMQ - Message order of delivery, and I used a post processor for this.
@Component
public class RabbitConnectionFactoryPostProcessor implements BeanPostProcessor {
  @Override
  public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
    if (bean instanceof CachingConnectionFactory) {
      ((CachingConnectionFactory) bean).setExecutor(Executors.newSingleThreadExecutor());
    }
    return bean;
  }
}

And now, after some master-pom updates (we do not control the master pom, it is at project level) the fix suddenly does not work anymore. After checking the differences, I did not see any changes on the spring-rabbit or spring-amqp, I do not understand why there is an impact.


Here are more details if you want concrete examples:

  1. Producer.

The TCP Server sends a message to the tcpAdapter app, which uses spring-integration flow to take the message from TCP and send it to rabbitmq.

Here is the code that does this (inboundAdapterClient I did not post here because I do not think it is important):

  @Bean
  public IntegrationFlow tcpToRabbitFlowClient() {
    return IntegrationFlows.from(inboundAdapterClient())      
        .transform(tcpToRabbitTransformer)     
        .channel(TCP_ADAPTER_SOURCE);
        .get();
  }

Message are received by tcpAdapter app from TCP in the right order, but then the tcpAdapter rabbitmq stack does not send them in the correct order every time (80% of the time ok, 20% wrong order)

Here is the spring boot yml configuration (only relevant info):

spring:
  rabbitmq:
    publisher-confirms: true
    publisher-returns: true
  cloud:
    stream:
      bindings:
        tcpAdapterSource:
          binder: rabbit
          content-type: application/json
          destination: tcpadapter.messagereceiver
  1. Consumer.

The message receiver has the single thread executor enforced plus the configuration as below.

Here is the spring boot yml configuration (only relevant info)

spring:
  cloud:     
    stream:
      bindings:
        fromTcpAdapter:
          binder: rabbit
          content-type: application/json
          destination: tcpadapter.messagereceiver
      rabbit:
        default:
          producer:
            exchangeDurable: false
            exchangeAutoDelete: true
          consumer:
            exchangeDurable: false
            exchangeAutoDelete: true

Note: There is only one producer and one consumer.

Some versions from pom, maybe it helps:

      <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot</artifactId>
        <version>2.2.4.RELEASE</version>
      </dependency>
      <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-amqp</artifactId>
        <version>2.2.3.RELEASE</version>
      </dependency>
      <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit</artifactId>
        <version>2.2.3.RELEASE</version>
      </dependency>
      <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream</artifactId>
        <version>3.0.1.RELEASE</version>
      </dependency>
Artem Bilan
  • 113,505
  • 11
  • 91
  • 118
Florin
  • 55
  • 8
  • Would you mind to experiment not consuming messages from the queue, but rather confirm that they are really placed into in a desired order? I mean it might really be the fact not from your consumer side, but on the Spring Cloud Stream producer one... – Artem Bilan Mar 31 '21 at 14:58
  • What was the previous spring-rabbit version? When using publisher confirms, returning the channel to the cache is now delayed until the confirm is received, which can cause subsequent sends to go on a different channel and they might arrive out of order. If you can upgrade spring-rabbit to 2.3.x, you could use the `ThreadChannelConnectionFactory`, which ensures that the same channel is always used for all sends by each thread. – Gary Russell Mar 31 '21 at 14:59
  • @ArtemBilan So I should use a man-in-the-middle between my app and the rabbitmq broker to check the messages? And if I see the order is lost on the producer, is there any solution than upgrading to spring-rabbit 2.3.x to use ThreadChannelConnectionFactory? – Florin Mar 31 '21 at 15:25
  • Question for GaryRussell: Could you tell me how would I use the ThreadChannelConnectionFactory? Is there a special property that I could set in the application.yml for spring cloud stream to enable this? Or should I do it via @Configuration class? Do you have any examples because from the documentation I could not see the solution. Thank you so much! – Florin Mar 31 '21 at 15:27
  • The "man in the middle" is just a RabbitMQ Management Console: https://www.rabbitmq.com/management.html. There you can navigate to the queue and see its content. Of course, if your consumer is UP, the messages are going to be pulled from the queue you can't make any assumption. – Artem Bilan Mar 31 '21 at 15:28
  • @GaryRussell I looked up the ThreadChannelConnectionFactory and I can`t seem to make it work with RabbitTemplate. Can you offer us some examples? – Andreea Jun 08 '21 at 15:22
  • Don't ask new questions in comments. It doesn't help people searching for Q&A. Ask a new question showing what you have tried (code and configuration) and exactly what you mean by "can't seem to make it work". – Gary Russell Jun 08 '21 at 16:09
  • @GaryRussell its the same question as Florin asked. We do not know how to use ThreadChannelConnectionFactory. I have looked at the documentation but it does not have a concrete example. But it is okay , I will create another question regarding this – Andreea Jun 11 '21 at 07:26

1 Answers1

0

Solved by removing yml configuration and using explicit bean declarations and factory configurations as described below. The only issue is that is slow performance wise, but this is expected with publisher confirms.

So indeed it was a producer problem.

  @Bean
  public CachingConnectionFactory connectionFactory() {
    com.rabbitmq.client.ConnectionFactory connectionFactoryClient = new com.rabbitmq.client.ConnectionFactory();
    connectionFactoryClient.setUsername(username);
    connectionFactoryClient.setPassword(password);
    connectionFactoryClient.setHost(hostname);
    connectionFactoryClient.setVirtualHost(vhost);
    return new CachingConnectionFactory(connectionFactoryClient);
  }

  @Bean("rabbitTemplateAdapter")
  @Primary
  public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
    connectionFactory.setPublisherConfirmType(CORRELATED);
    connectionFactory.setPublisherReturns(true);
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    rabbitTemplate.setMandatory(true);
    rabbitTemplate.setConfirmCallback((correlationData, ack, cause)
            -> log.debug("correlationData({}),ack({}),cause ({})", correlationData, ack, cause));
    rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey)
            -> log.debug("exchange({}),route({}),replyCode({}),replyText({}),message:{}",
            exchange, routingKey, replyCode, replyText, message));
    return rabbitTemplate;
  }

And for sending messages:

rabbitTemplateAdapter.invoke(t -> {
      t.convertAndSend(
              exchange,
              DESTINATION,
              jsonMessage.getPayload(),
              m -> {outboundMapper().fromHeadersToRequest(jsonMessage.getHeaders(), m.getMessageProperties());
                return m;
              });
      t.waitForConfirmsOrDie(10_000);
      return true;
    });

I did this using the spring rabbit and amqp versions:

<dependency>
  <groupId>org.springframework.amqp</groupId>
  <artifactId>spring-rabbit</artifactId>
  <version>2.2.3.RELEASE</version>
</dependency>
<dependency>
  <groupId>org.springframework.amqp</groupId>
  <artifactId>spring-amqp</artifactId>
  <version>2.2.3.RELEASE</version>
</dependency>

Spring amqp documentation helped a lot, the technique used is called "Scoped Operations": https://docs.spring.io/spring-amqp/docs/2.2.7.RELEASE/reference/html/#scoped-operations

Florin
  • 55
  • 8