I am dealing with a message ordering issue, after having fixed it a while ago, now the fix does not work anymore.
Just for overview, I have the following environment:
The order is lost somewhere between tcpAdapter and the message receiver.
This issue I have fixed using:
- on the producer side - using publisher confirms and returns
rabbitmq:
publisher-confirms: true
publisher-returns: true
- on the consumer side - enforcing single thread executor: The idea I found here: RabbitMQ - Message order of delivery, and I used a post processor for this.
@Component
public class RabbitConnectionFactoryPostProcessor implements BeanPostProcessor {
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof CachingConnectionFactory) {
((CachingConnectionFactory) bean).setExecutor(Executors.newSingleThreadExecutor());
}
return bean;
}
}
And now, after some master-pom updates (we do not control the master pom, it is at project level) the fix suddenly does not work anymore. After checking the differences, I did not see any changes on the spring-rabbit or spring-amqp, I do not understand why there is an impact.
Here are more details if you want concrete examples:
- Producer.
The TCP Server sends a message to the tcpAdapter app, which uses spring-integration flow to take the message from TCP and send it to rabbitmq.
Here is the code that does this (inboundAdapterClient I did not post here because I do not think it is important):
@Bean
public IntegrationFlow tcpToRabbitFlowClient() {
return IntegrationFlows.from(inboundAdapterClient())
.transform(tcpToRabbitTransformer)
.channel(TCP_ADAPTER_SOURCE);
.get();
}
Message are received by tcpAdapter app from TCP in the right order, but then the tcpAdapter rabbitmq stack does not send them in the correct order every time (80% of the time ok, 20% wrong order)
Here is the spring boot yml configuration (only relevant info):
spring:
rabbitmq:
publisher-confirms: true
publisher-returns: true
cloud:
stream:
bindings:
tcpAdapterSource:
binder: rabbit
content-type: application/json
destination: tcpadapter.messagereceiver
- Consumer.
The message receiver has the single thread executor enforced plus the configuration as below.
Here is the spring boot yml configuration (only relevant info)
spring:
cloud:
stream:
bindings:
fromTcpAdapter:
binder: rabbit
content-type: application/json
destination: tcpadapter.messagereceiver
rabbit:
default:
producer:
exchangeDurable: false
exchangeAutoDelete: true
consumer:
exchangeDurable: false
exchangeAutoDelete: true
Note: There is only one producer and one consumer.
Some versions from pom, maybe it helps:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot</artifactId>
<version>2.2.4.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-amqp</artifactId>
<version>2.2.3.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.2.3.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
<version>3.0.1.RELEASE</version>
</dependency>