72

I want to ensure whether kafka server is running or not before starting production and consumption jobs. It is in windows environment and here's my kafka server's code in eclipse...

Properties properties = new Properties();
properties.setProperty("broker.id", "1");
properties.setProperty("port", "9092");
properties.setProperty("log.dirs", "D://workspace//");
properties.setProperty("zookeeper.connect", "localhost:2181"); 

Option<String> option = Option.empty();
KafkaConfig config = new KafkaConfig(properties);       
KafkaServer kafka = new KafkaServer(config, new CurrentTime(), option);
kafka.startup();

In this case if (kafka != null) is not enough because it is always true. So is there any way to know that my kafka server is running and ready for producer. It is necessary for me to check this because it causes loss of some starting data packets.

User
  • 4,023
  • 4
  • 37
  • 63

10 Answers10

53

All Kafka brokers must be assigned a broker.id. On startup a broker will create an ephemeral node in Zookeeper with a path of /broker/ids/$id. As the node is ephemeral it will be removed as soon as the broker disconnects, e.g. by shutting down.

You can view the list of the ephemeral broker nodes like so:

echo dump | nc localhost 2181 | grep brokers

The ZooKeeper client interface exposes a number of commands; dump lists all the sessions and ephemeral nodes for the cluster.

Note, the above assumes:

  • You're running ZooKeeper on the default port (2181) on localhost, and that localhost is the leader for the cluster
  • Your zookeeper.connect Kafka config doesn't specify a chroot env for your Kafka cluster i.e. it's just host:port and not host:port/path
Paul Carey
  • 1,768
  • 1
  • 17
  • 19
  • 2
    So, this actually checks to see if `zookeeper` has _at least one_ `kafka` connected. It doesn't test that _your_ `kafka` is running. It will be correct in the OP's situation, but it is an indirect test. May have to research what can be done on port 9092 for a direct test. – Jesse Chisholm Feb 07 '20 at 00:48
  • Thanks for sharing the tip. Would you share the source? I want to read more. – Cloud Cho Feb 23 '23 at 18:59
25

You can install Kafkacat tool on your machine

For example on Ubuntu You can install it using

apt-get install kafkacat

once kafkacat is installed then you can use following command to connect it

kafkacat -b <your-ip-address>:<kafka-port> -t test-topic
  • Replace <your-ip-address> with your machine ip
  • <kafka-port> can be replaced by the port on which kafka is running. Normally it is 9092

once you run the above command and if kafkacat is able to make the connection then it means that kafka is up and running

selftaught91
  • 7,013
  • 3
  • 20
  • 26
  • 4
    kafkacat -b localhost:9092 -L // as per https://docs.confluent.io/platform/current/app-development/kafkacat-usage.html, -L displays the current state of the Kafka cluster and its topics, partitions, replicas ... – AmerS Sep 03 '21 at 01:05
  • For anyone on a mac: - brew install kafkacat #usage kcat -b -t -p – Josh Mar 13 '23 at 18:03
20

I used the AdminClient api.

Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("connections.max.idle.ms", 10000);
properties.put("request.timeout.ms", 5000);
try (AdminClient client = KafkaAdminClient.create(properties))
{
    ListTopicsResult topics = client.listTopics();
    Set<String> names = topics.names().get();
    if (names.isEmpty())
    {
        // case: if no topic found.
    }
    return true;
}
catch (InterruptedException | ExecutionException e)
{
    // Kafka is not available
}
Mohammad Faisal
  • 5,783
  • 15
  • 70
  • 117
  • This seems not correct, as it returns true when only one broker is up. – Leon Nov 04 '19 at 08:32
  • @Leon can you elaborate more on your comment? – Mohammad Faisal Nov 14 '19 at 12:49
  • If you have >=3 brokers and replica is 3, the code returns true if 1 broker is up while the other >=2 are down. Actually it requires 1 zk node only and you can get topic name list without any broker up running. – Leon Nov 23 '19 at 13:59
  • 2
    @Leon I find this answer valuable. Of course you need to understand what it is that you are actually "monitoring". Probing Zookeeper and probing the bootstrap servers as here are going to answer two different sets of questions. I argue that from a client perspective connecting to the bootstrap server(s) is the right thing to do. Even though you should have dedicated monitoring for the Kafka cluster (ZK and nodes) it makes sense to verify that a specific client can actually connect to the cluster. – Marcel Stör Jan 20 '21 at 13:24
  • 2
    This should be marked as the accepted answer! – raikumardipak Oct 08 '21 at 10:35
10

For Linux, "ps aux | grep kafka" see if kafka properties are shown in the results. E.g. /path/to/kafka/server.properties

Raffy Arnaez
  • 119
  • 1
  • 2
5

Paul's answer is very good and it is actually how Kafka & Zk work together from a broker point of view.

I would say that another easy option to check if a Kafka server is running is to create a simple KafkaConsumer pointing to the cluste and try some action, for example, listTopics(). If kafka server is not running, you will get a TimeoutException and then you can use a try-catch sentence.

  def validateKafkaConnection(kafkaParams : mutable.Map[String, Object]) : Unit = {
    val props = new Properties()
    props.put("bootstrap.servers", kafkaParams.get("bootstrap.servers").get.toString)
    props.put("group.id", kafkaParams.get("group.id").get.toString)
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    val simpleConsumer = new KafkaConsumer[String, String](props)
    simpleConsumer.listTopics()
  }
dbustosp
  • 4,208
  • 25
  • 46
  • 1
    I started the zookeeper only not the Kafka and then executed the `kafka-topics.bat --list` and I got all the topics. It seems that only listing topics could not verify that Kafka is running. – Mohammad Faisal Apr 04 '18 at 07:35
  • @MohammadFaisal That command will get the information metadata from Zookeeper (https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/admin/TopicCommand.scala#L115-L119). The solution I provided is getting the information from Kafka Cluster (if you follow the source code, you will get to this: https://github.com/apache/kafka/blob/0.10.0/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L203-L209). That is because you are not getting any error when Kafka cluster is down. – dbustosp Apr 16 '18 at 02:27
1

The good option is to use AdminClient as below before starting to produce or consume the messages

private static final int ADMIN_CLIENT_TIMEOUT_MS = 5000;           
 try (AdminClient client = AdminClient.create(properties)) {
            client.listTopics(new ListTopicsOptions().timeoutMs(ADMIN_CLIENT_TIMEOUT_MS)).listings().get();
        } catch (ExecutionException ex) {
            LOG.error("Kafka is not available, timed out after {} ms", ADMIN_CLIENT_TIMEOUT_MS);
            return;
        }
Mohammad Faisal
  • 5,783
  • 15
  • 70
  • 117
1

Firstly you need to create AdminClient bean:

 @Bean
 public AdminClient adminClient(){
   Map<String, Object> configs = new HashMap<>();
   configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
   StringUtils.arrayToCommaDelimitedString(new Object[]{"your bootstrap server address}));
   return AdminClient.create(configs);
 }

Then, you can use this script:

while (true) {
   Map<String, ConsumerGroupDescription> groupDescriptionMap =
         adminClient.describeConsumerGroups(Collections.singletonList(groupId))
         .all()
         .get(10, TimeUnit.SECONDS);

   ConsumerGroupDescription consumerGroupDescription = groupDescriptionMap.get(groupId);

   log.debug("Kafka consumer group ({}) state: {}",
                groupId,
                consumerGroupDescription.state());

   if (consumerGroupDescription.state().equals(ConsumerGroupState.STABLE)) {
        boolean isReady = true;
        for (MemberDescription member : consumerGroupDescription.members()) {
            if (member.assignment() == null || member.assignment().topicPartitions().isEmpty()) {
            isReady = false;
            }
        }

        if (isReady) {
            break;
           }
        }

        log.debug("Kafka consumer group ({}) is not ready. Waiting...", groupId);
        TimeUnit.SECONDS.sleep(1);
}

This script will check the state of the consumer group every second till the state will be STABLE. Because all consumers assigned to topic partitions, you can conclude that server is running and ready.

Abzelhan
  • 510
  • 1
  • 7
  • 26
1

Adding to answer above:

For anyone on a mac:

- brew install kafkacat

 #usage
  kcat -b <broker> -t <topic> -p <partition>

Josh
  • 1,059
  • 10
  • 17
0

you can use below code to check for brokers available if server is running.

import org.I0Itec.zkclient.ZkClient;
     public static boolean isBrokerRunning(){
        boolean flag = false;
        ZkClient zkClient = new ZkClient(endpoint.getZookeeperConnect(), 10000);//, kafka.utils.ZKStringSerializer$.MODULE$);
        if(zkClient!=null){
            int brokersCount = zkClient.countChildren(ZkUtils.BrokerIdsPath());
            if(brokersCount > 0){
                logger.info("Following Broker(s) {} is/are available on Zookeeper.",zkClient.getChildren(ZkUtils.BrokerIdsPath()));
                flag = true;    
            }
            else{
                logger.error("ERROR:No Broker is available on Zookeeper.");
            }
            zkClient.close();

        }
        return flag;
    }
usman
  • 1,351
  • 5
  • 23
  • 47
  • Here, If I have multiple brokers running already, then false results would be returned. Your solution may work accurately for single broker environment. – User Feb 15 '19 at 10:43
  • Yes, for single broker it is perfect. BTW you did not mention for multiple. let me give it a try then. you can check if any one of the broker is up do not check for others, so as long as any broker is up you can skip others. On the other hand you still need to check for the last running broker as first N brokers may be down in very unfortunate case. – usman Feb 17 '19 at 16:22
0

I found an event OnError in confluent Kafka:

consumer.OnError += Consumer_OnError;

 private void Consumer_OnError(object sender, Error e)
    {
        Debug.Log("connection error: "+ e.Reason);
        ConsumerConnectionError(e);
    }

And its documentation in code:

    //
    // Summary:
    //     Raised on critical errors, e.g. connection failures or all brokers down. Note
    //     that the client will try to automatically recover from errors - these errors
    //     should be seen as informational rather than catastrophic
    //
    // Remarks:
    //     Executes on the same thread as every other Consumer event handler (except OnLog
    //     which may be called from an arbitrary thread).
    public event EventHandler<Error> OnError;
Muhammad Faizan Khan
  • 10,013
  • 18
  • 97
  • 186