1

In an effort to learn Apache Kafka, I’ve developed a Spring Boot application that sends messages to a Kafka topic if I send a POST request to a controller that calls a KafkaTemplate send method. I’m running Ubuntu 19.04 and successfully set up and installed Kafka and Zookeeper locally. Everything works fine.

The problem happens when I shut down either Zookeeper or Kafka. If I do this then on startup the Kafka AdminClient of my application periodically tries to find a broer but sends this message to the console

Connection to node -1 could not be established. Broker may not be available.

I implemented the fixes suggested here Kafka + Zookeeper: Connection to node -1 could not be established. Broker may not be available and here Spring-Boot and Kafka : How to handle broker not available?. But if I run a maven clean install then the build never finishes if Zookeeper and Kafka aren’t running. Why is this and is there a way to configure the application so that it checks for Kafka availability on startup and gracefully handles when the service is unavailable?

Here is my service class that calls the KafkaTemplate

@Autowired
public PingMessageServiceImpl(KafkaTemplate kafkaTemplate, KafkaTopicConfiguration kafkaTopicConfiguration) {
    this.kafkaTemplate = kafkaTemplate;
    this.kafkaTopicConfiguration = kafkaTopicConfiguration;
}

@Override
public void sendMessage(String message) {
    log.info(String.format("Received following ping message %s", message));

    if (!isValidPingRequest(message)) {
        log.warn("Received invalid ping request");
        throw new InvalidPingRequestException();
    }
    log.info(String.format("Sending message=[%s]", message));
    ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(kafkaTopicConfiguration.getPingTopic(), message);
    future.addCallback(buildListenableFutureCallback(message));
}

private boolean isValidPingRequest(String message) {
    return "ping".equalsIgnoreCase(message);
}

private ListenableFutureCallback<SendResult<String, String>> buildListenableFutureCallback(String message) {
    return new ListenableFutureCallback<SendResult<String, String>>() {
        @Override
        public void onSuccess(SendResult<String, String> result) {
            log.info(String.format("Sent message=[%s] with offset=[%d]", message, result.getRecordMetadata().offset()));
        }
        @Override
        public void onFailure(Throwable ex) {
            log.info(String.format("Unable to send message=[%s] due to %s", message, ex.getMessage()));
        }
    };
}

Here is the configuration class that I use to extract configuration properties for Kafka from the properties file

@NotNull(message = "bootstrapAddress cannot be null")
@NotBlank(message = "bootstrapAddress cannot be blank")
private String bootstrapAddress;

@NotNull(message = "pingTopic cannot be null")
@NotBlank(message = "pingTopic cannot be blank")
private String pingTopic;

@NotNull(message = "reconnectBackoffMs cannot be null")
@NotBlank(message = "reconnectBackoffMs cannot be blank")
@Value("${kafka.reconnect.backoff.ms}")
private String reconnectBackoffMs;

@Bean
public KafkaAdmin kafkaAdmin() {
    Map<String, Object> configurations = new HashMap<>();
    configurations.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    configurations.put(AdminClientConfig.RECONNECT_BACKOFF_MS_CONFIG, reconnectBackoffMs);
    return new KafkaAdmin(configurations);
}

@Bean
public NewTopic pingTopic() {
    return new NewTopic(pingTopic, 1, (short) 1);
}

@PostConstruct
private void displayOnStartup() {
    log.info(String.format("bootstrapAddress is %s", bootstrapAddress));
    log.info(String.format("reconnectBackoffMs is %s", reconnectBackoffMs));
}
Space Cadet
  • 385
  • 6
  • 23
  • do you have any test's, i would like to see if you have any – Ryuzaki L Jul 21 '19 at 16:22
  • I have plenty of tests for other classes in the application but none for the service that uses Kafka. – Space Cadet Jul 21 '19 at 16:23
  • are those integration test annotated with `@SpringBootTest`? – Ryuzaki L Jul 21 '19 at 16:24
  • Some of them are, yes – Space Cadet Jul 21 '19 at 16:24
  • 1
    That is the issue, when you load `ApplicationContext` during integration test, kafka Beans `KafkaAdmin`, `KafkaTemplate` will try to connect with kafka server, you can simply mock those to avoid such type of scenarios – Ryuzaki L Jul 21 '19 at 16:26
  • Thank you, I understand. Is there a way to mock the `Kafka` instance so that the Beans will connect to it? Or is there a way to tell the integration tests to ignore loading the `Kafka` beans? – Space Cadet Jul 21 '19 at 16:27
  • 1
    yes you can use spring embedded Kafka server but still there is a work around with it, i will say just mock `KafkaTemplate` if producing and `KafkaAdmin` client you should be good – Ryuzaki L Jul 21 '19 at 16:30

1 Answers1

2

If you have any Spring-boot integration test while loading the ApplicationContext spring kafka beans like KafakTemplate,KafkaAdmin will try to connect the kafka server with the properties specified in yml or properties file

So to avoid this you can use spring-embedded-kafka-server, so that kafka beans will connect to embedded server during test execution.

Or simple you can just mock the KafakTemplate and KafkaAdmin beans using @MockBean annotation in the integration test cases

Ryuzaki L
  • 37,302
  • 12
  • 68
  • 98