52

Edit FYI: working gitHub example


I was searching the internet and couldn't find a working and simple example of an embedded Kafka test.

My setup is:

  • Spring boot
  • Multiple @KafkaListener with different topics in one class
  • Embedded Kafka for test which is starting fine
  • Test with Kafkatemplate which is sending to topic but the @KafkaListener methods are not receiving anything even after a huge sleep time
  • No warnings or errors are shown, only info spam from Kafka in logs

Please help me. There are mostly over configured or overengineered examples. I am sure it can be done simple. Thanks, guys!

@Controller
public class KafkaController {

    private static final Logger LOG = getLogger(KafkaController.class);

    @KafkaListener(topics = "test.kafka.topic")
    public void receiveDunningHead(final String payload) {
        LOG.debug("Receiving event with payload [{}]", payload);
        //I will do database stuff here which i could check in db for testing
    }
}

private static String SENDER_TOPIC = "test.kafka.topic";

@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, SENDER_TOPIC);

@Test
    public void testSend() throws InterruptedException, ExecutionException {

        Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);

        KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps);
        producer.send(new ProducerRecord<>(SENDER_TOPIC, 0, 0, "message00")).get();
        producer.send(new ProducerRecord<>(SENDER_TOPIC, 0, 1, "message01")).get();
        producer.send(new ProducerRecord<>(SENDER_TOPIC, 1, 0, "message10")).get();
        Thread.sleep(10000);
    }
Yuna Braska
  • 721
  • 1
  • 7
  • 15
  • show the code. see if this helps https://stackoverflow.com/questions/48682745/embeddedkafka-how-to-check-received-messages-in-unit-test/48687229#48687229 – pvpkiran Feb 12 '18 at 18:30
  • @pvpkiran this is still not working. the test does only test itself but never reaches my KafkaListener when i just take the sending part to my topic – Yuna Braska Feb 12 '18 at 18:57
  • It’s not clear by your test code how that `KafkaController` is involved. How are you sure that the listener is started? – Artem Bilan Feb 13 '18 at 04:32
  • @ArtemBilan cause there is the [@KafkaListener] annotation on the method. or do I have todo something else? – Yuna Braska Feb 13 '18 at 07:23
  • Right, the test needs to bootstrap an application context with that component – Artem Bilan Feb 13 '18 at 13:01
  • @ArtemBilan its spring boot, the context is starting. with the test annotation [@RunWith(SpringRunner.class) @SpringBootTest] unfortunately, even get this example fails: https://www.codenotfound.com/spring-kafka-boot-example.html – Yuna Braska Feb 13 '18 at 13:34
  • Right, but how that `@SpringBootTest` know about your `KafkaController` component? How is it scanned or configured ? – Artem Bilan Feb 13 '18 at 14:00
  • Pay attention how that sample has `@SpringBootApplication` class in the same package as `@SpringBootTest`. And those `Receiver` and `Sender` components are in the nested packages. So, they all are clearly scanned and configured. And that's how it works. If your `@SpringBootTest` is in different package, your component are not visible and you should provide some `@Configuraiton` class. – Artem Bilan Feb 13 '18 at 14:03

4 Answers4

42

Embedded Kafka tests work for me with below configs,

Annotation on test class

@EnableKafka
@SpringBootTest(classes = {KafkaController.class}) // Specify @KafkaListener class if its not the same class, or not loaded with test config
@EmbeddedKafka(
    partitions = 1, 
    controlledShutdown = false,
    brokerProperties = {
        "listeners=PLAINTEXT://localhost:3333", 
        "port=3333"
})
public class KafkaConsumerTest {
    @Autowired
    KafkaEmbedded kafkaEmbeded;

    @Autowired
    KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

Before annotation for setup method

@Before
public void setUp() throws Exception {
  for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {
    ContainerTestUtils.waitForAssignment(messageListenerContainer, 
    kafkaEmbeded.getPartitionsPerTopic());
  }
}

Note: I am not using @ClassRule for creating embedded Kafka rather auto-wiring
@Autowired embeddedKafka

@Test
public void testReceive() throws Exception {
     kafkaTemplate.send(topic, data);
}

Hope this helps!

Edit: Test configuration class marked with @TestConfiguration

@TestConfiguration
public class TestConfig {

@Bean
public ProducerFactory<String, String> producerFactory() {
    return new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(kafkaEmbedded));
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
    KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());
    kafkaTemplate.setDefaultTopic(topic);
    return kafkaTemplate;
}

Now @Test method will autowire KafkaTemplate and use is to send message

kafkaTemplate.send(topic, data);

Updated answer code block with above line

166_MMX
  • 590
  • 5
  • 18
donm
  • 1,160
  • 13
  • 20
  • 1
    Thanks! this sounds great, but where does [@EmbeddedKafka] and [kafkaListenerEndpointRegistry] come from? Can you post a full example with imports? – Yuna Braska Feb 13 '18 at 07:29
  • Since we have annotated our class with `@EnableKafka` and `@EmbeddedKafka` annotations, you can autowire both in the test class. In answer first code block, `@Autowired KafkaEmbedded kafkaEmbedded` is already there, just like that you can autowire for `kafkaListenerEndpointRegistry` – donm Feb 13 '18 at 14:13
  • I get every time the same error while I am testing different solutions: ERROR org.springframework.kafka.support.LoggingProducerListener:76 - Exception thrown when sending a message with key='null' and payload='Hello Message!' to topic myTopic: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms. – Yuna Braska Feb 13 '18 at 16:07
  • I have just updated `@TestConfiguration` code in the answer, Hope it will help you! – donm Feb 13 '18 at 16:21
  • 1
    I still don't know where "@EmbeddedKafka" is coming from. which dependency Is needed for that? I am using currently "spring-kafka-test" – Yuna Braska Feb 14 '18 at 13:03
  • `spring-kafka-test` has `@EmbeddedKafka`, you should be able to use `@Autowired KafkaEmbedded kafkaEmbeded;` – donm Feb 14 '18 at 21:46
  • okay found it!! [@EmbeddedKafka] is not included in newer versions of "spring-kafka-test" – Yuna Braska Feb 15 '18 at 18:35
  • 2
    `TestConfig` is best declared as an inner class within `KafkaConsumerTest`. In this case: a) It must be `static` b) `KafkaEmbedded` must be injected as a parameter of the method `producerFactory` c) Inject `ProducerFactory` as a parameter to the method `kafkaTemplate` and then use it instead of calling `producerFactory()`. – dav.garcia Oct 19 '18 at 22:02
  • This embedded kafka solution not working with cucumber test. – legend Dec 27 '19 at 05:09
  • 2
    The `setup()` method with `ContainerTestUtils.waitForAssignment(..)` was gold for us. We encountered the consumer to hang after another test class which caused the consumer for the next test to not receive anything. We also use `@DirtiesContext(AFTER_CLASS)` – Tom AsIdea Oct 11 '21 at 13:34
  • KafkaEmbeded is deprecated. Doubtful this will work now in Sept 2022 – StackOverFlow Sep 12 '22 at 13:49
22

since the accepted answer doesn't compile or work for me. I find another solution based on https://blog.mimacom.com/testing-apache-kafka-with-spring-boot/ what I would like to share with you.

The dependency is 'spring-kafka-test' version: '2.2.7.RELEASE'

@RunWith(SpringRunner.class)
@EmbeddedKafka(partitions = 1, topics = { "testTopic" })
@SpringBootTest
public class SimpleKafkaTest {

    private static final String TEST_TOPIC = "testTopic";

    @Autowired
    EmbeddedKafkaBroker embeddedKafkaBroker;

    @Test
    public void testReceivingKafkaEvents() {
        Consumer<Integer, String> consumer = configureConsumer();
        Producer<Integer, String> producer = configureProducer();

        producer.send(new ProducerRecord<>(TEST_TOPIC, 123, "my-test-value"));

        ConsumerRecord<Integer, String> singleRecord = KafkaTestUtils.getSingleRecord(consumer, TEST_TOPIC);
        assertThat(singleRecord).isNotNull();
        assertThat(singleRecord.key()).isEqualTo(123);
        assertThat(singleRecord.value()).isEqualTo("my-test-value");

        consumer.close();
        producer.close();
    }

    private Consumer<Integer, String> configureConsumer() {
        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testGroup", "true", embeddedKafkaBroker);
        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        Consumer<Integer, String> consumer = new DefaultKafkaConsumerFactory<Integer, String>(consumerProps)
                .createConsumer();
        consumer.subscribe(Collections.singleton(TEST_TOPIC));
        return consumer;
    }

    private Producer<Integer, String> configureProducer() {
        Map<String, Object> producerProps = new HashMap<>(KafkaTestUtils.producerProps(embeddedKafkaBroker));
        return new DefaultKafkaProducerFactory<Integer, String>(producerProps).createProducer();
    }
}
Sylhare
  • 5,907
  • 8
  • 64
  • 80
LazR
  • 631
  • 8
  • 18
8

I solved the issue now

@BeforeClass
public static void setUpBeforeClass() {
    System.setProperty("spring.kafka.bootstrap-servers", embeddedKafka.getBrokersAsString());
    System.setProperty("spring.cloud.stream.kafka.binder.zkNodes", embeddedKafka.getZookeeperConnectionString());
}

while I was debugging, I saw that the embedded kaka server is taking a random port.

I couldn't find the configuration for it, so I am setting the kafka config same as the server. Looks still a bit ugly for me.

I would love to have just the @Mayur mentioned line

@EmbeddedKafka(partitions = 1, controlledShutdown = false, brokerProperties = {"listeners=PLAINTEXT://localhost:9092", "port=9092"})

but can't find the right dependency in the internet.

Sylhare
  • 5,907
  • 8
  • 64
  • 80
Yuna Braska
  • 721
  • 1
  • 7
  • 15
  • 10
    You can set spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers} in your application.properties for the test, that should work. This is filled from EmbeddedKafka with the random port it was assigned on startup. – Flashpix Jan 25 '19 at 10:31
  • 1
    This annotation @EmbeddedKafka in my case came from spring-kafka-test-2.6.5. I have a dependency in pom to spring-kafka-test and I'm using spring-boot 2.4.2 version.@Sylhare – Aldo Inácio da Silva Feb 05 '21 at 17:08
2

In integration testing, having fixed ports like 9092 is not recommended because multiple tests should have the flexibility to open their own ports from embedded instances. So, following implementation is something like that,

NB: this implementation is based on junit5(Jupiter:5.7.0) and spring-boot 2.3.4.RELEASE

TestClass:

@EnableKafka
@SpringBootTest(classes = {ConsumerTest.Config.class, Consumer.class})
@EmbeddedKafka(
        partitions = 1,
        controlledShutdown = false)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class ConsumerTest {

    @Autowired
    private EmbeddedKafkaBroker kafkaEmbedded;

    @Autowired
    private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

    @BeforeAll
    public void setUp() throws Exception {
        for (final MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {
            ContainerTestUtils.waitForAssignment(messageListenerContainer,
                    kafkaEmbedded.getPartitionsPerTopic());
        }
    }

    @Value("${topic.name}")
    private String topicName;

    @Autowired
    private KafkaTemplate<String, Optional<Map<String, List<ImmutablePair<String, String>>>>> requestKafkaTemplate;

    @Test
    public void consume_success() {
        requestKafkaTemplate.send(topicName, load);
    }


    @Configuration
    @Import({
            KafkaListenerConfig.class,
            TopicConfig.class
    })
    public static class Config {

        @Value(value = "${spring.kafka.bootstrap-servers}")
        private String bootstrapAddress;

        @Bean
        public ProducerFactory<String, Optional<Map<String, List<ImmutablePair<String, String>>>>> requestProducerFactory() {
            final Map<String, Object> configProps = new HashMap<>();
            configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
            configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
            return new DefaultKafkaProducerFactory<>(configProps);
        }

        @Bean
        public KafkaTemplate<String, Optional<Map<String, List<ImmutablePair<String, String>>>>> requestKafkaTemplate() {
            return new KafkaTemplate<>(requestProducerFactory());
        }
    }
}

Listener Class:

@Component
public class Consumer {
    @KafkaListener(
            topics = "${topic.name}",
            containerFactory = "listenerContainerFactory"
    )
    @Override
    public void listener(
            final ConsumerRecord<String, Optional<Map<String, List<ImmutablePair<String, String>>>>> consumerRecord,
            final @Payload Optional<Map<String, List<ImmutablePair<String, String>>>> payload
    ) {
        
    }
}

Listner Config:

@Configuration
public class KafkaListenerConfig {

    @Value(value = "${spring.kafka.bootstrap-servers}")
    private String bootstrapAddress;

    @Value(value = "${topic.name}")
    private String resolvedTreeQueueName;

    @Bean
    public ConsumerFactory<String, Optional<Map<String, List<ImmutablePair<String, String>>>>> resolvedTreeConsumerFactory() {
        final Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, resolvedTreeQueueName);
        return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new CustomDeserializer());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Optional<Map<String, List<ImmutablePair<String, String>>>>> resolvedTreeListenerContainerFactory() {
        final ConcurrentKafkaListenerContainerFactory<String, Optional<Map<String, List<ImmutablePair<String, String>>>>> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(resolvedTreeConsumerFactory());
        return factory;
    }

}

TopicConfig:

@Configuration
public class TopicConfig {

    @Value(value = "${spring.kafka.bootstrap-servers}")
    private String bootstrapAddress;

    @Value(value = "${topic.name}")
    private String requestQueue;

    @Bean
    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        return new KafkaAdmin(configs);
    }

    @Bean
    public NewTopic requestTopic() {
        return new NewTopic(requestQueue, 1, (short) 1);
    }
}

application.properties:

spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}

This assignment is the most important assignment that would bind the embedded instance port to the KafkaTemplate and, KafkaListners.

Following the above implementation, you could open dynamic ports per test class and, it would be more convenient.

Janitha Madushan
  • 1,453
  • 3
  • 28
  • 40