5

I have something like below which works well, but I would prefer checking health without sending any message, (not only checking socket connection). I know Kafka has something like KafkaHealthIndicator out of the box, does someone have experience or example using it ?

   public class KafkaHealthIndicator implements HealthIndicator {
   private final Logger log = LoggerFactory.getLogger(KafkaHealthIndicator.class);

   private KafkaTemplate<String, String> kafka;

   public KafkaHealthIndicator(KafkaTemplate<String, String> kafka) {
   this.kafka = kafka;
   }

  @Override
  public Health health() {
  try {
     kafka.send("kafka-health-indicator", "❥").get(100, TimeUnit.MILLISECONDS);
  } catch (InterruptedException | ExecutionException | TimeoutException e) {
      return Health.down(e).build();
  }
  return Health.up().build();
 }
}
Michal Foksa
  • 11,225
  • 9
  • 50
  • 68
icecool09
  • 69
  • 1
  • 1
  • 3
  • I have seen developers of my organisation to just copy this code without any change. Whenever you are sending any kafka packet to a topic you should add a processing time.And this topic could be used by multiple services in microservice system so better to send service name also. Something like : kafka.send("kafka-health-indicator", "ProcessingTime : "+ LocalDateTime.now(ZoneOffset.UTC) + " , Service : myService"); – akash777.sharma May 13 '22 at 06:20
  • https://stackoverflow.com/a/74233687/2872157 I came to simple solution. If any kafka producer/consumer connections are active, then it probably works – vyacheslav.kislov Oct 28 '22 at 10:17
  • In my case..there is a aws msk and aws secrets manager involved...will kafka admin still work with only bootstrap server value set in the admin configuration parameters..if not can someone please share the code for pulling out topic user name and password from aws secrets arn...Thanks – acearch Jun 30 '23 at 09:42

2 Answers2

7

In order to trip health indicator, retrieve data from one of the future objects otherwise indicator is UP even when Kafka is down!!!

When Kafka is not connected future.get() throws an exception which in turn set this indicator down.

@Configuration
public class KafkaConfig {

    @Autowired
    private KafkaAdmin kafkaAdmin;

    @Bean
    public AdminClient kafkaAdminClient() {
        return AdminClient.create(kafkaAdmin.getConfigurationProperties());
    }

    @Bean
    public HealthIndicator kafkaHealthIndicator(AdminClient kafkaAdminClient) {
        final DescribeClusterOptions options = new DescribeClusterOptions()
            .timeoutMs(1000);

        return new AbstractHealthIndicator() {
            @Override
            protected void doHealthCheck(Health.Builder builder) throws Exception {
                DescribeClusterResult clusterDescription = kafkaAdminClient.describeCluster(options);

                // In order to trip health indicator DOWN retrieve data from one of
                // future objects otherwise indicator is UP even when Kafka is down!!!
                // When Kafka is not connected future.get() throws an exception which 
                // in turn sets the indicator DOWN.
                clusterDescription.clusterId().get();
                // or clusterDescription.nodes().get().size()
                // or clusterDescription.controller().get();

                builder.up().build();

                // Alternatively directly use data from future in health detail.
                builder.up()
                        .withDetail("clusterId", clusterDescription.clusterId().get())
                        .withDetail("nodeCount", clusterDescription.nodes().get().size())
                        .build();
            }
        };
    }

}
Michal Foksa
  • 11,225
  • 9
  • 50
  • 68
  • please i can't access to options variable in this line kafkaAdminClient.describeCluster(options); – James Jul 26 '21 at 14:56
  • thank you. now it's ok.but im getting error TimeoutException Timed out waiting to send the call. – James Jul 26 '21 at 15:55
  • Foska. i think it's an authentication problem. do you know which authentication properties i should provide to DescribeClusterOptions – James Jul 27 '21 at 13:31
  • Let us [continue this discussion in chat](https://chat.stackoverflow.com/rooms/235343/discussion-between-james-and-michal-foksa). – James Jul 27 '21 at 14:40
  • 5
    I'm not sure if this solution is specific to a certain version but, in my case, it doesn't work. When you stop the Kafka service after the spring-boot service starts up this still continues to return `UP`. I'm not sure why this is happening though (I'm guessing something related to admin client always working with KafkaFuture objects?) So I used `kafkaAdminClient.listTopics(new ListTopicsOptions().timeoutMs(5000)).names().get(5000, MILLISECONDS)` to force-wait for the result of a KafkaFuture and now, it throws a `TimeoutException` whenever the Kafka service is down. – emrekgn Nov 16 '21 at 15:23
  • 1
    describeCluster() seems to no longer throw exceptions so the only way to fail that call is if you try to retrieve the results from the futures and it fails. – Aithusa May 13 '22 at 15:51
  • Example and comments have been fixed in the answer. – Michal Foksa Jun 11 '22 at 18:09
  • I'm getting error at KafkaAdmin ? I don't have any properties.yml file I have hardcoded thing in java How can I get this KafkaAdmin ? – Bug Oct 26 '22 at 06:24
  • AdminClient.create throws exception in case 'No resolvable bootstrap urls given in bootstrap.servers' – YerivanLazerev Jan 29 '23 at 07:37
  • @MichalFoksa And how would you write JUnit test for that with `ApplicationContextRunner` which would check if it has beans? I have a problem with `@Autowired KafkaAdmin`. – Lui Mar 10 '23 at 10:10
  • My autowired KafkaAdmin's getConfigurationProperties is returning a single bootstrap local server (looks default), although I have a full set of properties defined in application.yml. Is this a known issue to anyone? I don't want to have to load the properties manually but will if no other choice. – reactive-core Apr 10 '23 at 16:48
0

Use the AdminClient API to check the health of the cluster via describing the cluster and/or the topic(s) you'll be interacting with, and verifying those topics have the required number of insync replicas, for example

Kafka has something like KafkaHealthIndicator out of the box

It doesn't. Spring's Kafka integration might

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245