1

We're using Spring RabbitMQ Binder to partition queue. We're consuming queue and then based on the our implementation of PartitionKeyExtractorStrategy we send messages to queue partitions. It is important for us that messages which get into queue partitions preserved their order but they are not for some reason. We see from logs of PartitionKeyExtractorStrategy implementation that messages consumed from main queue are in the right order. Could it be that partition producer sends messages to queue partitions async or using multiple channels so that order is broke from time to time?

This is our application.yml config:

spring:
  cloud:
    stream:
      bindings:
        mainQueue:
          destination: TopicExchange
          group: MainQueue
          consumer:
            partitioned: false
            concurrency: 1
            maxAttempts: 1
        partitionProducer:
          destination: TopicExchange
          producer:
            partitionCount: ${REPLICAS}
            partitionKeyExtractorName: userIdKeyExtractor
...

rabbit:
  bindings:
    mainQueue:
      consumer:
        bindingRoutingKeyDelimiter: ","
        bindingRoutingKey: routingKey1, routingKey2
        declareExchange: true
        queueNameGroupOnly: true
        exclusive: true
        prefetch: 100
        batchSize: 100
        transacted: true
        autoBindDlq: false
        republishToDlq: false
        requeueRejected: true
    partitionProducer:
      producer:
        declareExchange: true
    partitionConsumer:
      consumer:
        declareExchange: true
        queueNameGroupOnly: true
        prefetch: 100
        txSize: 1
        transacted: true
        autoBindDlq: false
        republishToDlq: false
        requeueRejected: true
        enableBatching: true
        batchSize: 1
        receiveTimeout: 100
    queryConsumer:
      consumer:
        anonymousGroupPrefix: com.some.Query-
        bindingRoutingKeyDelimiter: ","
        bindingRoutingKey: Event1,Event2,Event3
        declareExchange: true
        queueNameGroupOnly: true
        prefetch: 1
        txSize: 1
        autoBindDlq: false
        republishToDlq: false
        requeueRejected: true
        durableSubscription: false
        expires: 600000

As you see above we've tried to make main queue consumer transactional but it didn't solve our issue.

1 Answers1

0

By default, a CachingConnectionFactory is used; since channels are cached, there is no guarantee that the same channel is used when publishing on the same thread; in high volume, multi-threaded environments, this can cause out of order delivery.

You can avoid this issue by defining a ThreadLocalChannelConnectionFactory @Bean instead; then Boot will no longer auto configure the CCF and your bean will be used instead.

https://docs.spring.io/spring-amqp/docs/current/reference/html/#choosing-factory

Gary Russell
  • 166,535
  • 14
  • 146
  • 179
  • Gary, with ThreadChannelConnectionFactory we get error: reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'com.some.Query-5ARULNhgReyKysHQq7OfHQ' in vhost '/'. It could be originally declared on another connection or the exclusive property value does not match that of the origina..., class-id=50, method-id=10). Seems like due to both RabbitAdmin and BlockingQueueConsumer are trying to declare a queue. We don't have such issue with CachingConnectionFactory for some reason although both RabbitAdmin and BlockingQueueConsumer declare same queue. – Yury Yaroshevich Dec 19 '22 at 12:31
  • That appears to be an anonymous (autoDelete, exclusive) queue from somewhere else in the application and has nothing to do with the binding in the configuration above. You can't use such queues with the TLCCF. – Gary Russell Dec 19 '22 at 13:54
  • I've added queryConsumer binding in the configuration above. So you're saying for such kind of queues we can't use ThreadLocalChannelConnectionFactory, right? – Yury Yaroshevich Dec 19 '22 at 14:36
  • Not with an anonymous group - if you add `group` (like the other binding), it will work. – Gary Russell Dec 19 '22 at 14:44
  • Is it possible to use different connection factories for different bindings? – Yury Yaroshevich Dec 19 '22 at 14:53
  • seems like yes https://stackoverflow.com/questions/34416404/spring-amqp-with-two-connectionfactory – Yury Yaroshevich Dec 19 '22 at 15:02
  • No; that's for stand alone spring-amqp; the binder uses the same connection factory for all bindings. You could work around it by loading two application contexts. – Gary Russell Dec 19 '22 at 15:08