0

I am migrating a Play v 2.3.4 app to Play v 2.5.4. Along the way I had to also upgrade to Scala 2.11.8 and kafka 9.0+ to support the updated Play version.

Most of the issues I have worked out but I cannot figure out a Kafka issue with some code that manages Kafka topics though AdminUtils. The troubles are all centered around kafka.utils.ZkStringSerialzier.

I am using org.I0Itec.zkclient package to instances ZkClient object that is passed in the construction of ZkUtils object but it fails because it cannot resolve my ZkStringSerializer.

Related code is:

import kafka.admin.AdminUtils
import kafka.utils.ZkUtils
import kafka.utils.ZKStringSerializer
import org.I0Itec.zkclient.{ZkClient, ZkConnection}
object Topic {
    def CreateKafkaTopic(topic: String, zookeeperHosts: String, partitionSize: Int, replicationCount: Int, connectionTimeoutMs: Int = 10000, sessionTimeoutMs: Int = 10000): Boolean = {
        var zkSerializer: ZKStringSerializer = ZKStringSerializer
        val zkClient: ZkClient= new ZkClient(zookeeperHosts, connectionTimeoutMs, sessionTimeoutMs, zkSerializer)
        val topicConfig: Properties = new Properties()
        val isSecureKafkaCluster: Boolean = false

        val zkUtils: ZkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), isSecureKafkaCluster)

        AdminUtils.createTopic(zkUtils, topic, partitionSize, replicationCount, topicConfig)
        zkClient.close()
    }
}

The above code results in the error that ZKStringSerializer is inaccessible from his place.

I found several related post to creating topics (mostly in Java and before Kafka 9.0) Creating a topic for Apache Kafka 0.9 Using Java How create Kafka ZKStringSerializer in Java? How Can we create a topic in Kafka from the IDE using API And Finally Creating a Kafka topic results in no leader

Based on these I updated by code as follows:

import kafka.admin.AdminUtils
import kafka.utils.ZkUtils
import kafka.utils.ZKStringSerializer$
import org.I0Itec.zkclient.{ZkClient, ZkConnection}
object Topic {
    def CreateKafkaTopic(topic: String, zookeeperHosts: String, partitionSize: Int, replicationCount: Int, connectionTimeoutMs: Int = 10000, sessionTimeoutMs: Int = 10000): Boolean = {
        var zkSerializer: ZKStringSerializer = ZKStringSerializer$.MODULE$
        val zkClient: ZkClient= new ZkClient(zookeeperHosts, connectionTimeoutMs, sessionTimeoutMs, zkSerializer)
        val topicConfig: Properties = new Properties()
        val isSecureKafkaCluster: Boolean = false

        val zkUtils: ZkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), isSecureKafkaCluster)

        AdminUtils.createTopic(zkUtils, topic, partitionSize, replicationCount, topicConfig)
        zkClient.close()
    }
}

And then I just get unable to resolve symbol ZkStringSerialzer$ errors.

I tried both with the org.I0Itec.zkclient.serialize.ZkSerializer object as well and it did not make a difference.

So my Question is actually two fold: 1. What is the significance of the '$' character for the Import and Declarations statements in scala. I have used it in string interpolation ( e/g/ s"var value is $var")to reference variables but this seems different. 2. What is wrong with my code. Is it the way I am importing, declaring, something else?

I am new to scala and Play but I am feeling like quite and idiot at the moment so any advice / help is appreciated

~Dave

P.S. In case it helps relevant bits from project files

build.sbt:

lazy val `api` = (project in file(".")).enablePlugins(PlayScala)
scalaVersion := "2.11.8"

libraryDependencies ++= Seq(
  "org.apache.kafka" % "kafka_2.11" % "0.9.0.1",
  jdbc,
  cache,
  ws,
  specs2 % Test
)

plugins.sbt:

resolvers += "Typesafe repository" at "http://repo.typesafe.com/typesafe/releases/"

addSbtPlugin("com.typesafe.play" % "sbt-plugin" % "2.5.4")

addSbtPlugin("com.typesafe.sbt" % "sbt-coffeescript" % "1.0.0")

addSbtPlugin("com.typesafe.sbt" % "sbt-less" % "1.0.0")

addSbtPlugin("com.typesafe.sbt" % "sbt-jshint" % "1.0.1")

addSbtPlugin("com.typesafe.sbt" % "sbt-rjs" % "1.0.1")

addSbtPlugin("com.typesafe.sbt" % "sbt-digest" % "1.0.0")

addSbtPlugin("com.typesafe.sbt" % "sbt-mocha" % "1.0.0")

build.properties:

sbt.version=0.13.5
Community
  • 1
  • 1
DVS
  • 783
  • 7
  • 25

2 Answers2

0

After fighting this issue over the weekend I gave up on the ZKClient package that had been used previously and simple used Kafka directly which was actually much cleaner that trying to use the I0Itech ZKClient.

New implementation goes like this:

import java.util.Properties
import kafka.admin.AdminUtils
import kafka.utils.ZkUtils

class Topic {
  def CreateKafkaTopic(topic: String, zookeeperHosts: String, partitionSize: Int, replicationCount: Int, connectionTimeoutMs: Int = 10000, sessionTimeoutMs: Int = 10000): Boolean = {
    if (ListKafkaTopics(zookeeperHosts).contains(topic) ) {
      return false
    }
    val zkUtils = ZkUtils.apply(zookeeperHosts, sessionTimeoutMs, connectionTimeoutMs, false)
    AdminUtils.createTopic( zkUtils, topic, partitionSize, replicationCount, new Properties())
    zkUtils.close()
    true
  }
}

End the end removed a dependency and make cleaner code so a double win I suppose.

~Dave

DVS
  • 783
  • 7
  • 25
  • All you did as for creating zkUtils is to replace `ZkStringSerializer` with `false`. How does that work? – Eric Apr 28 '17 at 04:34
  • @user2418202 My original problem was trying to instantiate the ZkSerializer to pass to the I0Itec ZkClient. I stopped trying to use the org.I0Itec.zkclient package to create a ZKClient and let it all be handled by the Kafka ZkUtils directly. – DVS Jun 19 '17 at 12:16
0

The reason for this problem is the ZkStringSerialzer is declared as private, just use ZkUtils.createZkClient instead as follows:

    ZkUtils.createZkClient(zookeeperHosts, sessionTimeoutMs, connectionTimeoutMs)