1

I am trying to create a Kafka topic using Java API, but getting LEADER is NOT AVAILABLE.

Code:

int partition = 0;
        ZkClient zkClient = null;
        try {
            String zookeeperHosts = "localhost:2181"; // If multiple zookeeper then -> String zookeeperHosts = "192.168.20.1:2181,192.168.20.2:2181";
            int sessionTimeOutInMs = 15 * 1000; // 15 secs
            int connectionTimeOutInMs = 10 * 1000; // 10 secs

            zkClient = new ZkClient(zookeeperHosts, sessionTimeOutInMs, connectionTimeOutInMs, ZKStringSerializer$.MODULE$);

            String topicName = "mdmTopic5";
            int noOfPartitions = 2;
            int noOfReplication = 1;
            Properties topicConfiguration = new Properties();
            AdminUtils.createTopic(zkClient, topicName, noOfPartitions, noOfReplication, topicConfiguration);

        } catch (Exception ex) {
            ex.printStackTrace();
        } finally {
            if (zkClient != null) {
                zkClient.close();
            }
        }

Error:

[2017-10-19 12:14:42,263] WARN Error while fetching metadata with correlation id 1 : {mdmTopic5=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2017-10-19 12:14:42,370] WARN Error while fetching metadata with correlation id 3 : {mdmTopic5=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2017-10-19 12:14:42,479] WARN Error while fetching metadata with correlation id 4 : {mdmTopic5=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)

Does Kafka 0.11.0.1 supports AdminUtils.??? Please let me know how to create topic in this version.

Thanks in Advance.

2 Answers2

2

Since Kafka 0.11 there is a proper Admin API for creating (and deleting) topics and I'd recommend to use it instead of directly connecting to Zookeeper.

See AdminClient.createTopics(): http://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/admin/AdminClient.html#createTopics(java.util.Collection)

Mickael Maison
  • 25,067
  • 7
  • 71
  • 68
1

Generally LEADER NOT AVAILABLE points to network issues rather than issues with your code. Try:

telnet host port to see if you can connect to all required hosts/ports from your machine.

However, the latest approach is to use the BOOTSTRAP_SERVERS while creating topics.

A working version of topic creation code using scala would be as follows:

Import the required kafka-clients using sbt.

// https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients
libraryDependencies += Seq("org.apache.kafka" % "kafka-clients" % "2.1.1")

The code for topic creation in scala:

import java.util.Arrays
import java.util.Properties

import org.apache.kafka.clients.admin.NewTopic
import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig}

class CreateKafkaTopic {
  def create(): Unit = {
    val config = new Properties()
    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "192.30.1.5:9092")

    val localKafkaAdmin = AdminClient.create(config)

    val partitions = 3
    val replication = 1.toShort
    val topic = new NewTopic("integration-02", partitions, replication)
    val topics = Arrays.asList(topic)

    val topicStatus = localKafkaAdmin.createTopics(topics).values()
    //topicStatus.values()
    println(topicStatus.keySet())
  }

}

Hope it helps.

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
ForeverLearner
  • 1,901
  • 2
  • 28
  • 51