27

I am trying to write a unit test for a Kafka listener that I am developing using Spring Boot 2.x. Being a unit test, I don't want to start up a full Kafka server an instance of Zookeeper. So, I decided to use Spring Embedded Kafka.

The definition of my listener is very basic.

@Component
public class Listener {
    private final CountDownLatch latch;

    @Autowired
    public Listener(CountDownLatch latch) {
        this.latch = latch;
    }

    @KafkaListener(topics = "sample-topic")
    public void listen(String message) {
        latch.countDown();
    }
}

Also the test, that verifies the latch counter to be equal to zero after receiving a message, is very easy.

@RunWith(SpringRunner.class)
@SpringBootTest
@DirtiesContext
@EmbeddedKafka(topics = { "sample-topic" })
@TestPropertySource(properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}" })
public class ListenerTest {

    @Autowired
    private KafkaEmbedded embeddedKafka;

    @Autowired
    private CountDownLatch latch;

    private KafkaTemplate<Integer, String> producer;

    @Before
    public void setUp() {
        this.producer = buildKafkaTemplate();
        this.producer.setDefaultTopic("sample-topic");
    }

    private KafkaTemplate<Integer, String> buildKafkaTemplate() {
        Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
        ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
        return new KafkaTemplate<>(pf);
    }

    @Test
    public void listenerShouldConsumeMessages() throws InterruptedException {
        // Given
        producer.sendDefault(1, "Hello world");
        // Then
        assertThat(latch.await(10L, TimeUnit.SECONDS)).isTrue();
    }
}

Unfortunately, the test fails and I cannot understand why. Is it possible to use an instance of KafkaEmbedded to test a method marked with the annotation @KafkaListener?

All the code is shared in my GitHub repository kafka-listener.

Thanks to all.

riccardo.cardin
  • 7,971
  • 5
  • 57
  • 106

2 Answers2

17

You are probably sending the message before the consumer has been assigned the topic/partition. Set property...

spring:
  kafka:
    consumer:
      auto-offset-reset: earliest

...it defaults to latest.

This is like using --from-beginning with the console consumer.

EDIT

Oh; you're not using boot's properties.

Add

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

EDIT2

BTW, you should probably also do a get(10L, TimeUnit.SECONDS) on the result of the template.send() (a Future<>) to assert that the send was successful.

EDIT3

To override the offset reset just for the test, you can do the same as what you did for the broker addresses:

@Value("${spring.kafka.consumer.auto-offset-reset:latest}")
private String reset;

...

    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, this.reset);

and

@TestPropertySource(properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
        "spring.kafka.consumer.auto-offset-reset=earliest"})

However, bear in mind that this property only applies the first time a group consumes. To always start at the end each time the app starts, you have to seek to the end during startup.

Also, I would recommend setting enable.auto.commit to false so that the container takes care of committing the offsets rather than just relying on the consumer client doing it on a time schedule.

Gary Russell
  • 166,535
  • 14
  • 146
  • 179
  • 1
    Thank you. Set the `auto.offset.reset` property to `earliest` made the magic :) – riccardo.cardin May 02 '18 at 08:04
  • However, if I need to use `latest` as `auto.offset.reset` value? How can I make the test works? Thanks a lot. – riccardo.cardin May 02 '18 at 09:24
  • How is this ever testing the "Listener" class that contains the @KafkaListener method? I see the CountDownLatch assertion, but never any assertion that the method was hit... – Rob May 21 '19 at 19:23
  • ? the code in the `@KafkaListener` method counts down the latch, hence it was called.Typically, however, you wouldn't do it like this. Most likely the listener invokes a service and you would inject a mock or stubbed service instead. – Gary Russell May 21 '19 at 19:42
1

Maybe someone will find this useful. I had a similar problem. Locally tests were running (some checks were performed within Awaitility.waitAtMost) but in the Jenkins pipeline, tests were failing.

The solution was, like already mentioned in the most voted answer, setting auto-offset-reset=earliest. When tests are running, you can check if you set the configuration properly by looking into test logs. Spring outputs configuration for both producer and consumer

riccardo.cardin
  • 7,971
  • 5
  • 57
  • 106
Pawel
  • 466
  • 1
  • 7
  • 20