I am migrating a Play v 2.3.4 app to Play v 2.5.4. Along the way I had to also upgrade to Scala 2.11.8 and kafka 9.0+ to support the updated Play version.
Most of the issues I have worked out but I cannot figure out a Kafka issue with some code that manages Kafka topics though AdminUtils. The troubles are all centered around kafka.utils.ZkStringSerialzier.
I am using org.I0Itec.zkclient package to instances ZkClient object that is passed in the construction of ZkUtils object but it fails because it cannot resolve my ZkStringSerializer.
Related code is:
import kafka.admin.AdminUtils
import kafka.utils.ZkUtils
import kafka.utils.ZKStringSerializer
import org.I0Itec.zkclient.{ZkClient, ZkConnection}
object Topic {
def CreateKafkaTopic(topic: String, zookeeperHosts: String, partitionSize: Int, replicationCount: Int, connectionTimeoutMs: Int = 10000, sessionTimeoutMs: Int = 10000): Boolean = {
var zkSerializer: ZKStringSerializer = ZKStringSerializer
val zkClient: ZkClient= new ZkClient(zookeeperHosts, connectionTimeoutMs, sessionTimeoutMs, zkSerializer)
val topicConfig: Properties = new Properties()
val isSecureKafkaCluster: Boolean = false
val zkUtils: ZkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), isSecureKafkaCluster)
AdminUtils.createTopic(zkUtils, topic, partitionSize, replicationCount, topicConfig)
zkClient.close()
}
}
The above code results in the error that ZKStringSerializer is inaccessible from his place.
I found several related post to creating topics (mostly in Java and before Kafka 9.0) Creating a topic for Apache Kafka 0.9 Using Java How create Kafka ZKStringSerializer in Java? How Can we create a topic in Kafka from the IDE using API And Finally Creating a Kafka topic results in no leader
Based on these I updated by code as follows:
import kafka.admin.AdminUtils
import kafka.utils.ZkUtils
import kafka.utils.ZKStringSerializer$
import org.I0Itec.zkclient.{ZkClient, ZkConnection}
object Topic {
def CreateKafkaTopic(topic: String, zookeeperHosts: String, partitionSize: Int, replicationCount: Int, connectionTimeoutMs: Int = 10000, sessionTimeoutMs: Int = 10000): Boolean = {
var zkSerializer: ZKStringSerializer = ZKStringSerializer$.MODULE$
val zkClient: ZkClient= new ZkClient(zookeeperHosts, connectionTimeoutMs, sessionTimeoutMs, zkSerializer)
val topicConfig: Properties = new Properties()
val isSecureKafkaCluster: Boolean = false
val zkUtils: ZkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), isSecureKafkaCluster)
AdminUtils.createTopic(zkUtils, topic, partitionSize, replicationCount, topicConfig)
zkClient.close()
}
}
And then I just get unable to resolve symbol ZkStringSerialzer$ errors.
I tried both with the org.I0Itec.zkclient.serialize.ZkSerializer object as well and it did not make a difference.
So my Question is actually two fold: 1. What is the significance of the '$' character for the Import and Declarations statements in scala. I have used it in string interpolation ( e/g/ s"var value is $var")to reference variables but this seems different. 2. What is wrong with my code. Is it the way I am importing, declaring, something else?
I am new to scala and Play but I am feeling like quite and idiot at the moment so any advice / help is appreciated
~Dave
P.S. In case it helps relevant bits from project files
build.sbt:
lazy val `api` = (project in file(".")).enablePlugins(PlayScala)
scalaVersion := "2.11.8"
libraryDependencies ++= Seq(
"org.apache.kafka" % "kafka_2.11" % "0.9.0.1",
jdbc,
cache,
ws,
specs2 % Test
)
plugins.sbt:
resolvers += "Typesafe repository" at "http://repo.typesafe.com/typesafe/releases/"
addSbtPlugin("com.typesafe.play" % "sbt-plugin" % "2.5.4")
addSbtPlugin("com.typesafe.sbt" % "sbt-coffeescript" % "1.0.0")
addSbtPlugin("com.typesafe.sbt" % "sbt-less" % "1.0.0")
addSbtPlugin("com.typesafe.sbt" % "sbt-jshint" % "1.0.1")
addSbtPlugin("com.typesafe.sbt" % "sbt-rjs" % "1.0.1")
addSbtPlugin("com.typesafe.sbt" % "sbt-digest" % "1.0.0")
addSbtPlugin("com.typesafe.sbt" % "sbt-mocha" % "1.0.0")
build.properties:
sbt.version=0.13.5