5

I have a Spring Boot project that has a Kafka listener that I want to test using Embedded Kafka. I have the Kafka Listener log out the message "record received". Which will only be be logged out if I add a Thread.sleep(1000) to the start of the method.

Test class:

@SpringBootTest
@DirtiesContext
@EnableKafka
@EmbeddedKafka(partitions = 1, topics = { "my-topic" }, ports = 7654)
class KafkaTest {

    private static final String TOPIC = "my-topic";

    @Autowired
    EmbeddedKafkaBroker kafkaBroker;

    @Test
    void testSendEvent() throws ExecutionException, InterruptedException {
        // Thread.sleep(1000); // I wont see the Listener log message unless I add this sleep
        Producer<Integer, String> producer = configureProducer();
        ProducerRecord<Integer, String> producerRecord = new ProducerRecord<>(TOPIC, "myMessage");
        producer.send(producerRecord).get();
        producer.close();
    }

    private Producer<Integer, String> configureProducer() {
        Map<String, Object> producerProps = new HashMap<>(KafkaTestUtils.producerProps(kafkaBroker));
        return new DefaultKafkaProducerFactory<Integer, String>(producerProps).createProducer();
    }
}

I don't want to use the fickle Thread.sleep() The test is obviously executing before some setup processes have completed. I clearly need to wait on something, but I am not sure what nor how to do it.

Using:

  • Java 11
  • Spring Boot 2.5.6
  • JUnit 5
  • spring-kafka-test 2.7.8
Geeson
  • 51
  • 1
  • 3
  • What are you actually testing here? There's no asserts. Kafka itself has unit tests to verify producers work, so you're just duplicating tests – OneCricketeer Nov 10 '21 at 14:12
  • records from Kafka are stored in a DB and will be later retrieved, assertions will be performed on the retrieved entities. – Geeson Nov 10 '21 at 17:59
  • You are using Spring for this? There are Kafka Connectors that also have tests to verify that records can be written to certain databases. – OneCricketeer Nov 10 '21 at 18:13

3 Answers3

5

Add an @EventListener bean to the test context and (for example) count down a CountDownLatch when a ConsumerStartedEvent is received; then in the test

assertThat(eventListner.getLatch().await(10, TimeUnit.SECONDS)).isTrue();

See https://docs.spring.io/spring-kafka/docs/current/reference/html/#events

and

https://docs.spring.io/spring-kafka/docs/current/reference/html/#event-consumption

Or add a ConsumerRebalanceListener and wait for partition assignment.

Gary Russell
  • 166,535
  • 14
  • 146
  • 179
  • 1
    i have the same problem and implemented your suggestion. the event is received, latch is counted down, but still the test (sending) is executed too fast, (total 6 test, first test fails) for the kafkalistener class under test resp. kafkaHandler method to catch the event. the sleep also helps in my case in beforeAll. what also helps (in beforeAll) is to set up a dummy consumer (KafkaMessageListenerContainer), call ContainerTestUtils.waitForAssignment. this takes ca 350ms and all tests passes. any tip? – Valentin Bossi Jan 23 '22 at 10:10
  • 2
    After further consideration, the rebalance listener is a better option because there is still a small race condition between when the consumer is started and the partitions are assigned. The wait for assignment utility method works similarly. Also, setting auto.offset.reset=earliest on the consumer should help. – Gary Russell Jan 23 '22 at 14:13
0

I clearly need to wait on something, but I am not sure what nor how to do it.

You need to use a different method to give Kafka time to process and route the message ...

Look at this line ...

ConsumerRecord<String, String> message = records.poll(500, TimeUnit.MILLISECONDS);

When testing Kafka listeners we always specify a poll delay. This is because your message is given to kafka, which will then process it in another thread. And you need to wait for it.

Here's how it looks in context of the code its used in.

class UserKafkaProducerTest {
  @Test
  void testWriteToKafka() throws InterruptedException, JsonProcessingException {
      // Create a user and write to Kafka
      User user = new User("11111", "John", "Wick");
      producer.writeToKafka(user);

      // Read the message (John Wick user) with a test consumer from Kafka and assert its properties
      ConsumerRecord<String, String> message = records.poll(500, TimeUnit.MILLISECONDS);
      assertNotNull(message);
      assertEquals("11111", message.key());
      User result = objectMapper.readValue(message.value(), User.class);
      assertNotNull(result);
      assertEquals("John", result.getFirstName());
      assertEquals("Wick", result.getLastName());
  }
}

This is a code piece from this article, which makes stuff clear.

Arthur Klezovich
  • 2,595
  • 1
  • 13
  • 17
  • That code snippet is simply testing that the embedded Kafka testing framework is working. I do not want to assert that the record I add is consumed as I added it. Seems a bit pointless. I'd like my listener that is part of the main application logic to consume a record. I don't want my tests to consume anything. – Geeson Nov 10 '21 at 12:42
  • @Geeson The test verifies that the deserializer works as expected. Either you can test the deserializer directly, or via consumption – OneCricketeer Nov 10 '21 at 14:10
  • 1
    Listen for a `ConsumerStartedEvent`. Or add a `ConsumerRebalanceListener`. – Gary Russell Nov 10 '21 at 14:11
  • @Geeson How will you do an assert to test that you test is succesful ? – Arthur Klezovich Nov 10 '21 at 14:12
  • Records from the Kafka queue are processed and stored in a database. The assertions will be performed further down stream on entities retrieved from the db. This is largely an unimportant fact. I simply want my test to be able to produce a record and hit my application listener without using thread.sleep() – Geeson Nov 10 '21 at 17:54
  • You can't avoid waiting. Might as well use a proper API to do it https://github.com/awaitility/awaitility – Arthur Klezovich Nov 10 '21 at 18:07
0

You can use this small library for testing. All output records will be collected to blocking queue and you can poll them with timout:

    @OutputQueue(topic = TOPIC_OUT, partitions = 1)
    private BlockingQueue<ConsumerRecord<String, String>> consumerRecords;

    @Test
    void shouldFilterRecordWithoutHeader() throws ExecutionException, InterruptedException, TimeoutException {
        final String messageIn = "hello world";
        try (var producer = producer()) {
            producer.send(new ProducerRecord<>(TOPIC_IN, messageIn)).get(5, TimeUnit.SECONDS);
        }
        ConsumerRecord<String, String> record = consumerRecords.poll(5, TimeUnit.SECONDS);
        Assertions.assertThat(record).isNotNull();
    }
Vichukano
  • 99
  • 8