5

I'm writing a Kafka integration test for a simple Spring Boot application. The application simple publishes to a Kafka topic.

I am using an Embedded Kafka instance for the test. The test works perfectly fine when run through Intellij but fails when I run it via gradle. It looks as thought the latch countdown never reaches 0 and the test eventually times out.

Producer Config:

public class KafkaProducerConfig {

    @Value(value = "${kafka.bootstrap-address}")
    private String bootstrapAddress;

    @Bean
    public ProducerFactory<String, String> articleProducerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(
                ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                bootstrapAddress);
        configProps.put(
                ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class);
        configProps.put(
                ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, String> articleKafkaTemplate() {
        return new KafkaTemplate<>(articleProducerFactory());
    }
}

Producer:

public class KafkaProducer {

    @Value(value = "kafka.topic-name")
    String topicName;

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String message, String topic) throws KafkaPublishException {
        try {
            ListenableFuture<SendResult<String, String>> future =
                    kafkaTemplate.send(topic, message);
           future.get();
        } catch (Exception e) {
            throw new KafkaPublishException(e.getMessage());
        }

    }

    public String getTopicName() {
        return topicName;
    }

Consumer:

@Component
public class KafkaConsumerHelper {
    private CountDownLatch latch = new CountDownLatch(1);
    private String payload = null;

    @KafkaListener(topics = "${test.topic}")
    public void receive(ConsumerRecord<?, ?> consumerRecord) {

        setPayload(consumerRecord.toString());
        latch.countDown();
    }

    public CountDownLatch getLatch() {
        return latch;
    }

    public String getPayload() {
        return payload;
    }

    private void setPayload(String payload) {
        this.payload = payload;
    }
}

Test:

@SpringBootTest
@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_EACH_TEST_METHOD)
@EmbeddedKafka(partitions = 1, brokerProperties = {"listeners=PLAINTEXT://localhost:9092", "port=9092"})
public class KafkaProducerTest {

    @Autowired
    private KafkaProducer producer;

    @Autowired
    private KafkaConsumerHelper consumer;

    @Value("${test.topic}")
    private String topic;


    @Test
    public void shouldSuccessfullyPublishAnArticleMessageToEmbeddedKafka()
            throws Exception {

        String message = createArticle();

        producer.sendMessage(message, topic);
        consumer.getLatch().await();

        assertThat(consumer.getLatch().getCount(), equalTo(0L));
        assertThat(consumer.getPayload(), containsString(message));
    }

application.yml:

spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      auto-offset-reset: earliest
      group-id: my-id
test:
  topic: embedded-test-topic
  partitions-number: 1
  replication-factor: 1

Any idea what is the issue?

Dominika
  • 165
  • 1
  • 9
  • 1
    The `port` is deprecated: https://docs.confluent.io/platform/current/installation/configuration/broker-configs.html#brokerconfigs_port. Although it is still unclear why would one use `listeners` for embedded brokers anyway. You are missing in your configuration this part though: https://docs.spring.io/spring-boot/docs/current/reference/html/features.html#features.messaging.kafka.embedded. I mean the proper `spring.kafka.bootstrap-servers` auto-configuration property. Otherwise it is not clear how it works for you at all. Maybe you have Apache Kafka ran on your host exactly at that `9092` port – Artem Bilan Sep 02 '21 at 20:05
  • `getLatch()` doesn't return anything, so this code doesn't compile, as written. Other than that, are `kafka.topic-name` and `${test.topic}` set to the same value? – OneCricketeer Sep 02 '21 at 20:21
  • My bad, it appears that I missed couple of things when removing the many print statements in the code. The code does compile. I also added the producer config. As for the topic names the `kafka.topic-name` in the producer isn't actually used in test code. I pass the topic name defined in the test as an argument to the `sendMessage` method. – Dominika Sep 02 '21 at 20:54
  • So, if you use an existing, external Kafka broker, you don't need to use that `@EmbeddedKafka` then. – Artem Bilan Sep 02 '21 at 21:08
  • That's probably where I'm going wrong. I don't want to be connecting to any existing brokers, I want to be able to use an `@EmbeddedKafka`. – Dominika Sep 02 '21 at 21:21
  • Then see Spring Boot docs I’ve mentioned in my first comment. You need to remap that `bootstrap-servers` property to what Embedded Kafka exposes. – Artem Bilan Sep 03 '21 at 02:07
  • Thank you @ArtemBilan It worked once I remapped the bootstrap servers as you suggested. – Dominika Sep 03 '21 at 07:43

1 Answers1

6

For anyone looking at this question in the future, my problem was that I was not using @EmbeddedKafka properly.

The fix was to add bootstrapServersProperty = "spring.kafka.bootstrap-servers" property into the @EmbeddedKafka annotation.

@EmbeddedKafka(partitions = 1, bootstrapServersProperty = "spring.kafka.bootstrap-servers")

More information in the Kafka Docs.

Dominika
  • 165
  • 1
  • 9