Using the 1.0.0 Kafka admin client I wish to programmatically create a topic on the broker. I happen to be using Scala. I've tried using the following code to either create a topic on the Kafka broker or simply to list the available topics
import org.apache.kafka.clients.admin.{AdminClient, ListTopicsOptions, NewTopic}
import scala.collection.JavaConverters._
val zkServer = "localhost:2181"
val topic = "test1"
val zookeeperConnect = zkServer
val sessionTimeoutMs = 10 * 1000
val connectionTimeoutMs = 8 * 1000
val partitions = 1
val replication:Short = 1
val topicConfig = new Properties() // add per-topic configurations settings here
import org.apache.kafka.clients.admin.AdminClientConfig
val config = new Properties
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, zkServer)
val admin = AdminClient.create(config)
val existing = admin.listTopics(new ListTopicsOptions().timeoutMs(500).listInternal(true))
val nms = existing.names()
nms.get().asScala.foreach(nm => println(nm)) // nms.get() fails
val newTopic = new NewTopic(topic, partitions, replication)
newTopic.configs(Map[String,String]().asJava)
val ret = admin.createTopics(List(newTopic).asJavaCollection)
ret.all().get() // Also fails
admin.close()
With either command, the ZooKeeper (3.4.10) side throws an EOFException and closes the connection. Debugging the ZooKeeper side itself, it seems it is unable to deserialize the message that the admin client is sending (it runs out of bytes it is trying to read)
Anyone able to make the 1.0.0 Kafka admin client work for creating or listing topics?