0

I'm working on a SpringBoot application that must process messages sent via Kafka. In the application, I have a Spring Bean with methods annotated with @KafkaListener. These methods get called when a message is read from the appropriate topic and call a business logic to perform actual processing.

Now I'd like to create a test for this using the lib "spring-kafka-test", i.e. embedded kafka. I've created a test that sends a message to the topic and then checks whether the business logic has been called. The logic is mocked via @MockBean.

My test class looks like this:

@SpringBootTest
@DirtiesContext
@EmbeddedKafka(
    topics = { "myTopic" },
    brokerProperties = { "listeners=PLAINTEXT://localhost:9092" }  // <-- Why is this needed?
)
class KafkaListenerDispatchingTest {

    /** Object for sending the test messages */
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @MockBean
    private BusinessLogic businessLogic;

    @Test
    public void testProcessing() throws Exception {
        var testMessage = "...";
        kafkaTemplate.send("myTopic", testMessage).get();
        Mockito.verify(businessLogic).myMethod();
    }
}

Without the line marked with "Why is this needed" the test does not work. In the console I see many lines with

[Producer clientId=producer-1] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.

With the marked line the test works OK.

My question is: Why is the broker property listeners needed? From the Spring class (EmbeddedKafka) I'd expect that everything works with the default values and that special values only must be specified when having special needs.

I have

  • spring-boot 2.7.13
  • spring-kafka 2.9.9
  • spring-kafka-test 2.9.9
fml2
  • 190
  • 11

1 Answers1

0

Special need in this case a random port for embedded broker(s) and Spring Boot auto-configuration. But default it indeed expect Kafka broker on that localhost:9092, but you can reconfigure it to rely on a random port: bootstrapServersProperty = "spring.kafka.bootstrap-servers". This way an EmbeddedKafkaBroker will expose a respective system property which Spring Boot expects from us: https://docs.spring.io/spring-boot/docs/current/reference/htmlsingle/#messaging.kafka.embedded

Artem Bilan
  • 113,505
  • 11
  • 91
  • 118
  • It is not needed (and not recommended, better to use a random port, chosen by the OS). `brokerProperties` is provided in case you want to configure the broker - for example if you want to use transactions with a single broker instance, use `brokerProperties = { "transaction.state.log.replication.factor=1", "transaction.state.log.min.isr=1" }` – Gary Russell Jul 20 '23 at 16:17
  • I'd be happy if it worked without this setting, but it didn't. And I don't understand why. – fml2 Jul 20 '23 at 18:49
  • Do you have the mentioned `bootstrapServersProperty = "spring.kafka.bootstrap-servers"` instead? – Artem Bilan Jul 20 '23 at 18:54
  • I set `spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}` and it worked! (I saw this here: https://stackoverflow.com/a/60645149/6023830). I'm surprised that EmbeddedKafka does not set it automatically, i.e. that the Producer and Consumer that are configured by SpringBoot are not automatically configured so that they use the broker created by EmbeddedKafka. – fml2 Jul 21 '23 at 07:34
  • https://github.com/spring-projects/spring-kafka/issues/2750 – Gary Russell Jul 24 '23 at 13:24