4

I am programming a client to work with kafka 0.9. I want to know how to create a topic. This answer: How to create a Topic in Kafka through Java is similar to what I am asking. Except, that solution only works for Kafka 0.8.2 which is hugely different from Kafka 0.9's API.

Community
  • 1
  • 1
Chee Loong Soon
  • 3,601
  • 3
  • 17
  • 21
  • In other words, you tried similar solution and it doesn't work, does it? Please, describe what you tried and problems you encountered. – George Sovetov Apr 01 '16 at 22:46
  • I am working for a company. It runs Kafka 0.8.2. I implemented it based on the link. Now the company wants to upgrade to Kafka 0.9. I needed a quick solution to upgrade my code to 0.9. – Chee Loong Soon Apr 02 '16 at 23:18

3 Answers3

9

I tried following Soon Chee Loong's answer with Kafka 0.9.0.1 but had to make one change. ZKStringSerializer is now private. To create ZkUtils I used the following API (it creates a ZkClient internally):

ZkUtils.apply(
    "zookeeper1:port1,zookeeper2:port2",
    sessionTimeoutMs,
    connectionTimeoutMs,
    false)
Avi
  • 351
  • 1
  • 4
8

After looking through the scala api and various links online.

This is the solution I found:

Maven Dependencies:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>0.9.0.0</version>
</dependency>
<dependency>
    <groupId>com.101tec</groupId>
    <artifactId>zkclient</artifactId>
    <version>0.7</version>
</dependency>

Code:

import java.util.Properties;

import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;

import kafka.admin.AdminUtils;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;

public class KafkaJavaExample {

    public static void main(String[] args) {
        String zookeeperConnect = "zkserver1:2181,zkserver2:2181";
        int sessionTimeoutMs = 10 * 1000;
        int connectionTimeoutMs = 8 * 1000;

        ZkClient zkClient = new ZkClient(
            zookeeperConnect,
            sessionTimeoutMs,
            connectionTimeoutMs,
            ZKStringSerializer$.MODULE$);

       // Security for Kafka was added in Kafka 0.9.0.0
       boolean isSecureKafkaCluster = false;
       // ZkUtils for Kafka was used in Kafka 0.9.0.0 for the AdminUtils API
       ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect), isSecureKafkaCluster);

       String topic = "my-topic";
       int partitions = 2;
       int replication = 3;

       // Add topic configuration here
       Properties topicConfig = new Properties();

       AdminUtils.createTopic(zkUtils, topic, partitions, replication, topicConfig);
       zkClient.close();
    }
}

If you are wondering why the code below doesn't look like Java:

ZKStringSerializer$.MODULE$

It is because ZkStringSerializer is a Scala Object. You can read more information about that here:

How create Kafka ZKStringSerializer in Java?

Note: You must initialize the ZkClient with ZKStringSerializer.
If you don't, then createTopic() will only seem to work (In other words: it will return without error).
The topic will exist in only Zookeeper and only works when listing topics. i.e. list command below works fine

bin/kafka-topics.sh --list --zookeeper localhost:2181

but Kafka itself does not create the topic. To illustrate, the describe command below will throw an error.

bin/kafka-topics.sh --describe --zookeeper localhost:2181

Therefore, make sure you initialize it with ZKStringSerializer$.MODULE$.

References: How Can we create a topic in Kafka from the IDE using API‌​from-the-ide-using-api

Soon Chee Loong, University of Toronto

Community
  • 1
  • 1
Chee Loong Soon
  • 3,601
  • 3
  • 17
  • 21
  • 1
    The original link reference is [http://stackoverflow.com/questions/16946778/how-can-we-create-a-topic-in-kafka-from-the-ide-using-api](http://stackoverflow.com/questions/16946778/how-can-we-create-a-topic-in-kafka-from-the-ide-using-api) not http://www.askdaima.com/question/0e3b996eda49e3e4 – Jaya Ananthram Apr 05 '16 at 07:11
  • @JayaAnanthram Fixed, thanks. I did not know of the original link reference. – Chee Loong Soon Apr 06 '16 at 20:35
1

For Kafka 0.9 and above you need to use new API AdminZkClient. AdminUtils API is getting deprecated.

String zookeeperHost = "127.0.0.1:2181";
Boolean isSucre = false;
int sessionTimeoutMs = 200000;
int connectionTimeoutMs = 15000;
int maxInFlightRequests = 10;
Time time = Time.SYSTEM;
String metricGroup = "myGroup";
String metricType = "myType";
KafkaZkClient zkClient = KafkaZkClient.apply(zookeeperHost,isSucre,sessionTimeoutMs,
                connectionTimeoutMs,maxInFlightRequests,time,metricGroup,metricType);

AdminZkClient adminZkClient = new AdminZkClient(zkClient);

String topicName1 = "myTopic";
int partitions = 3;
int replication = 1;
Properties topicConfig = new Properties();

adminZkClient.createTopic(topicName1,partitions,replication,
            topicConfig,RackAwareMode.Disabled$.MODULE$);

You can check more details at this link

Mahesh Mogal
  • 638
  • 9
  • 13