1

One of Kafka new change in the upgrade from 2.3.0 to 2.5.0 is removing ZkUtils (see https://issues.apache.org/jira/browse/KAFKA-8545)

what is the best practice to remove the use and which package should I use instead

public void createTopic(String topicName, int partitions, int replicationFactor) {
    DataExportConfig conf = ApplicationContextProvider.getApplicationContext().getBean("dataExportConfig", DataExportConfig.class);
    String zKaddress = conf.getZkHost();
    boolean isSecureKafkaCluster = false;

    ZkUtils zkUtils ZkUtils.apply(zKaddress, zkSessionTimeoutMs, zkConnectionTimeOutInMs, isSecureKafkaCluster);
    try {
        if (!AdminUtils.topicExists(zkUtils, topicName)) {
            AdminUtils.createTopic(zkUtils, topicName, partitions, replicationFactor, new Properties(), RackAwareMode.Enforced$.MODULE$);
            //log
        } else {
            //log
        }

        List<String> topicList = Arrays.asList(topicName);
        // checking real partition size for topic
        Integer topicPartitionSizeInZooKeeper = JavaConversions.mapAsJavaMap(zkUtils.getPartitionAssignmentForTopics(JavaConversions.asScalaBuffer(topicList))).get(topicName).size();
        if (topicPartitionSizeInZooKeeper != partitions) {
            //log
    } catch (Exception ex) {
            //log
        throw ex;
    } finally {
        //close zookeeper client after all topics are created
        zkUtils.zkClient().close();
    }
}
Barak
  • 23
  • 6

1 Answers1

0

You now need to use the Admin API createTopics() method to create topics:

int partitions = 1;
short replicationFactor = 3;

Properties props = new Properties();
props.put("bootstrap.servers", "localshot:9092");
AdminClient admin = AdminClient.create(props);

NewTopic topic = new NewTopic("mytopic", partitions, replicationFactor);
Collection<NewTopic> newTopics = Arrays.asList(topic);
CreateTopicsResult ctr = admin.createTopics(newTopics);
ctr.all().get();
Mickael Maison
  • 25,067
  • 7
  • 71
  • 68
  • How do I configure the timeouts and the secure params like in the ZkUtils.apply? – Barak Jul 22 '20 at 10:06
  • You can add any of the admin configs (http://kafka.apache.org/documentation/#adminclientconfigs) to `props` to configure security and timeouts. You can also use a `CreateTopicsOptions` object via http://kafka.apache.org/25/javadoc/org/apache/kafka/clients/admin/Admin.html#createTopics-java.util.Collection-org.apache.kafka.clients.admin.CreateTopicsOptions- – Mickael Maison Jul 22 '20 at 11:45
  • thanks. It was very helpful – Barak Jul 22 '20 at 14:16