2

we are upgrading spring boot 2.2.6.release to 2.7.8 and facing below issue on Kafka listener.

Scenario -1:

Previously we were using SCS 3.0.4.Release and using @StreamListener to consume Kafka messages. we have below one scenario where one single Kafka channel TEST_CHANNEL used by 2 different @StreamListener. when any message published to TEST_TOPIC, both the listener receive a copy of every message and processed it. But my understanding was due to a single consumer group TEST_TOPIC_GROUP message should be consumed by each listener in round-robin fashion, only one listener should be consume the message and processed it at a time. Below is sample configuration that I did in my project.

@Component
public class TestListener1 {   

    @StreamListener("TEST_CHANNEL")
    public void handle(final Message<String> message) {
        log.info("execute Test Listener 1" );        
    }
}

@Component
public class TestListener2 {   

    @StreamListener("TEST_CHANNEL")
    public void handle(final Message<String> message) {
        log.info("execute Test Listener 2" );        
    }
}

configuration:

spring:
  cloud:
    stream:
      bindings:
        TEST_CHANNEL:
          binder: kafka
          content-type: application/json
          destination: TEST_TOPIC
          group: TEST_TOPIC_GROUP
    

Scenario -2:

After upgrade to SCS 3.2.6, @StreamListener is deprecated and now we are going to use Consumer interface, our Kafka listener will be implements Consumer Interface. Here we have 2 different channels (TEST_CHANNEL_1, TEST_CHANNEL_2), having same destination (TEST_TOPIC) and consumer group (TEST_TOPIC_GROUP). In this case, both listeners will receive messages, but the messages was distributed between the two listeners in a round-robin fashion. each message copy not consumed and processed by two listener as described in scenario 1.

@Component("TEST_CHANNEL_1")
public class TestListener1 implements Consumer<Message<String>> {
    
    @Override
    public void accept(final Message<String> message) {
        log.info("execute Test Listener 1" );        
    }
}

@Component("TEST_CHANNEL_2")
public class TestListener2 implements Consumer<Message<String>> {

    @Override    
    public void accept(final Message<String> message) {
        log.info("execute Test Listener 2" );        
    }
}

configuration:

spring:
  cloud:
    stream:
      bindings:
        TEST_CHANNEL_1:
          binder: kafka
          content-type: application/json
          destination: TEST_TOPIC
          group: TEST_TOPIC_GROUP
        TEST_CHANNEL_2:
          binder: kafka
          content-type: application/json
          destination: TEST_TOPIC
          group: TEST_TOPIC_GROUP

Did someone faced this issue earlier, please suggest ?

1 Answers1

1

In the first case, you only have a single binding. All listener methods on this same single binding are invoked when the binding receives messages. In this scenario (1), you have one single binding --> consumer group --> multiple listener methods on the binding. Spring Cloud Stream will treat this as a single consumer and invoke all the methods with data arrived on the binding. Basically, the binder creates a single consumer for this binding and invokes all the methods attached to the binding.

In the second scenario, things have different semantic meanings. There, you have two separate distinct bindings under the same consumer group. Although they might be in the same application, those consumer methods are considered as separate consumers by Spring Cloud Stream and, ultimately, by Kafka. Therefore, you have the correct natural behavior in this case, i.e., you have two consumers under the same consumer group receiving data in a round-robin fashion (competing consumers).

If you want to restore the original behavior with StreamListener, we suggest removing the consumer group (anonymous consumers) or placing the two consumer bindings under two different consumer groups so that both bindings will receive all the data from the topic.

sobychacko
  • 5,099
  • 15
  • 26