13

I need to write a smoke test in Java which validates whether the system is connected to kafka,

Does anyone have any idea? I have found this post:

How to check whether Kafka Server is running?

But it's too complicated to do from a Java code and I don't think It's the direction i should use.

Thanks in advance.

Community
  • 1
  • 1
user2199630
  • 203
  • 1
  • 2
  • 12

2 Answers2

15

I had the same question and I don't want to leave this question without any answer. I read a lot about how I can check the connection and most of the answers I found was checking the connection with Zk, but I really want to check the connection directly with Kafka server.

What I did is to create a simple KafkaConsumer and list all the topics with listTopics(). If the connection is success, then you will get something as a return. Otherwise, you will get a TimeoutException.

  def validateKafkaConnection(kafkaParams : mutable.Map[String, Object]) : Unit = {
    val props = new Properties()
    props.put("bootstrap.servers", kafkaParams.get("bootstrap.servers").get.toString)
    props.put("group.id", kafkaParams.get("group.id").get.toString)
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    val simpleConsumer = new KafkaConsumer[String, String](props)
    simpleConsumer.listTopics()
  }

then you can wrap this method in a try-catch sentence to catch the exception.

dbustosp
  • 4,208
  • 25
  • 46
  • 1
    you may also need to change the default timeout values : props.put(REQUEST_TIMEOUT_MS_CONFIG, "5000"); props.put(SESSION_TIMEOUT_MS_CONFIG, "4000"); to avoid waiting too much time. – victor gallet May 03 '18 at 12:26
3

Edit: This was for very old very of kafka. Do not use this in 2023 :)


You can check if the server is running by using this:

ZkClient zkClient = new ZkClient("your_zookeeper_server", 5000 /* ZOOKEEPER_SESSION_TIMEOUT */, 5000 /* ZOOKEEPER_CONNECTION_TIMEOUT */, ZKStringSerializer$.MODULE$);
List<Broker> brokers = scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllBrokersInCluster());
if (brokers.isEmpty()) {
    // No brokers available
} else {
    // There are brokers available
}
Nico
  • 201
  • 3
  • 11
  • 4
    This is a way to check Zookeeper connection, not Kafka server. – dbustosp Nov 24 '17 at 15:46
  • Yeah as @dbustosp mentioned this may have a double pitfall 1. Your application may not be able to reach Kafka while reaching ZK 2. You ZK may not be able to reach Kafka due to network or security conf – Zisis F Sep 09 '22 at 11:19