I am programming a client to work with kafka 0.9. I want to know how to create a topic. This answer: How to create a Topic in Kafka through Java is similar to what I am asking. Except, that solution only works for Kafka 0.8.2 which is hugely different from Kafka 0.9's API.
-
In other words, you tried similar solution and it doesn't work, does it? Please, describe what you tried and problems you encountered. – George Sovetov Apr 01 '16 at 22:46
-
I am working for a company. It runs Kafka 0.8.2. I implemented it based on the link. Now the company wants to upgrade to Kafka 0.9. I needed a quick solution to upgrade my code to 0.9. – Chee Loong Soon Apr 02 '16 at 23:18
3 Answers
I tried following Soon Chee Loong's answer with Kafka 0.9.0.1 but had to make one change. ZKStringSerializer is now private. To create ZkUtils I used the following API (it creates a ZkClient internally):
ZkUtils.apply(
"zookeeper1:port1,zookeeper2:port2",
sessionTimeoutMs,
connectionTimeoutMs,
false)

- 351
- 1
- 4
After looking through the scala api and various links online.
This is the solution I found:
Maven Dependencies:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.9.0.0</version>
</dependency>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.7</version>
</dependency>
Code:
import java.util.Properties;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import kafka.admin.AdminUtils;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
public class KafkaJavaExample {
public static void main(String[] args) {
String zookeeperConnect = "zkserver1:2181,zkserver2:2181";
int sessionTimeoutMs = 10 * 1000;
int connectionTimeoutMs = 8 * 1000;
ZkClient zkClient = new ZkClient(
zookeeperConnect,
sessionTimeoutMs,
connectionTimeoutMs,
ZKStringSerializer$.MODULE$);
// Security for Kafka was added in Kafka 0.9.0.0
boolean isSecureKafkaCluster = false;
// ZkUtils for Kafka was used in Kafka 0.9.0.0 for the AdminUtils API
ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect), isSecureKafkaCluster);
String topic = "my-topic";
int partitions = 2;
int replication = 3;
// Add topic configuration here
Properties topicConfig = new Properties();
AdminUtils.createTopic(zkUtils, topic, partitions, replication, topicConfig);
zkClient.close();
}
}
If you are wondering why the code below doesn't look like Java:
ZKStringSerializer$.MODULE$
It is because ZkStringSerializer is a Scala Object. You can read more information about that here:
How create Kafka ZKStringSerializer in Java?
Note: You must initialize the ZkClient with ZKStringSerializer.
If you don't, then createTopic() will only seem to work
(In other words: it will return without error).
The topic will exist in only Zookeeper and only works when listing topics.
i.e. list command below works fine
bin/kafka-topics.sh --list --zookeeper localhost:2181
but Kafka itself does not create the topic. To illustrate, the describe command below will throw an error.
bin/kafka-topics.sh --describe --zookeeper localhost:2181
Therefore, make sure you initialize it with ZKStringSerializer$.MODULE$.
References: How Can we create a topic in Kafka from the IDE using APIfrom-the-ide-using-api
Soon Chee Loong, University of Toronto

- 1
- 1

- 3,601
- 3
- 17
- 21
-
1The original link reference is [http://stackoverflow.com/questions/16946778/how-can-we-create-a-topic-in-kafka-from-the-ide-using-api](http://stackoverflow.com/questions/16946778/how-can-we-create-a-topic-in-kafka-from-the-ide-using-api) not http://www.askdaima.com/question/0e3b996eda49e3e4 – Jaya Ananthram Apr 05 '16 at 07:11
-
@JayaAnanthram Fixed, thanks. I did not know of the original link reference. – Chee Loong Soon Apr 06 '16 at 20:35
For Kafka 0.9 and above you need to use new API AdminZkClient. AdminUtils API is getting deprecated.
String zookeeperHost = "127.0.0.1:2181";
Boolean isSucre = false;
int sessionTimeoutMs = 200000;
int connectionTimeoutMs = 15000;
int maxInFlightRequests = 10;
Time time = Time.SYSTEM;
String metricGroup = "myGroup";
String metricType = "myType";
KafkaZkClient zkClient = KafkaZkClient.apply(zookeeperHost,isSucre,sessionTimeoutMs,
connectionTimeoutMs,maxInFlightRequests,time,metricGroup,metricType);
AdminZkClient adminZkClient = new AdminZkClient(zkClient);
String topicName1 = "myTopic";
int partitions = 3;
int replication = 1;
Properties topicConfig = new Properties();
adminZkClient.createTopic(topicName1,partitions,replication,
topicConfig,RackAwareMode.Disabled$.MODULE$);
You can check more details at this link

- 638
- 9
- 13