we are upgrading spring boot 2.2.6.release to 2.7.8 and facing below issue on Kafka listener.
Scenario -1:
Previously we were using SCS 3.0.4.Release and using @StreamListener to consume Kafka messages. we have below one scenario where one single Kafka channel TEST_CHANNEL used by 2 different @StreamListener. when any message published to TEST_TOPIC, both the listener receive a copy of every message and processed it. But my understanding was due to a single consumer group TEST_TOPIC_GROUP message should be consumed by each listener in round-robin fashion, only one listener should be consume the message and processed it at a time. Below is sample configuration that I did in my project.
@Component
public class TestListener1 {
@StreamListener("TEST_CHANNEL")
public void handle(final Message<String> message) {
log.info("execute Test Listener 1" );
}
}
@Component
public class TestListener2 {
@StreamListener("TEST_CHANNEL")
public void handle(final Message<String> message) {
log.info("execute Test Listener 2" );
}
}
configuration:
spring:
cloud:
stream:
bindings:
TEST_CHANNEL:
binder: kafka
content-type: application/json
destination: TEST_TOPIC
group: TEST_TOPIC_GROUP
Scenario -2:
After upgrade to SCS 3.2.6, @StreamListener is deprecated and now we are going to use Consumer interface, our Kafka listener will be implements Consumer Interface. Here we have 2 different channels (TEST_CHANNEL_1, TEST_CHANNEL_2), having same destination (TEST_TOPIC) and consumer group (TEST_TOPIC_GROUP). In this case, both listeners will receive messages, but the messages was distributed between the two listeners in a round-robin fashion. each message copy not consumed and processed by two listener as described in scenario 1.
@Component("TEST_CHANNEL_1")
public class TestListener1 implements Consumer<Message<String>> {
@Override
public void accept(final Message<String> message) {
log.info("execute Test Listener 1" );
}
}
@Component("TEST_CHANNEL_2")
public class TestListener2 implements Consumer<Message<String>> {
@Override
public void accept(final Message<String> message) {
log.info("execute Test Listener 2" );
}
}
configuration:
spring:
cloud:
stream:
bindings:
TEST_CHANNEL_1:
binder: kafka
content-type: application/json
destination: TEST_TOPIC
group: TEST_TOPIC_GROUP
TEST_CHANNEL_2:
binder: kafka
content-type: application/json
destination: TEST_TOPIC
group: TEST_TOPIC_GROUP
Did someone faced this issue earlier, please suggest ?