I am working with the new Kafka-scala-streams api recently opensourced by lightbend. And I am trying to run two streams. But Whats happening is two of them don't run simultaneously and I am not getting the desired output.
package in.internity
import java.util.Properties
import java.util.concurrent.TimeUnit
import com.lightbend.kafka.scala.streams.{KStreamS, StreamsBuilderS}
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.kstream.Produced
import org.apache.kafka.streams.{StreamsConfig, _}
import org.json4s.DefaultFormats
import org.json4s.native.JsonMethods.parse
import org.json4s.native.Serialization.write
import scala.util.Try
/**
* @author Shivansh <shiv4nsh@gmail.com>
* @since 8/1/18
*/
object Boot extends App {
implicit val formats: DefaultFormats.type = DefaultFormats
val config: Properties = {
val p = new Properties()
p.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application")
p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass)
p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass)
p
}
val streams1 = wordSplit("lines", "wordCount")
val streams2 = readAndWriteJson("person", "personName")
private def wordSplit(intopic: String, outTopic: String) = {
val builder = new StreamsBuilderS()
val produced = Produced.`with`(Serdes.String(), Serdes.String())
val textLines: KStreamS[String, String] = builder.stream(intopic)
val data: KStreamS[String, String] = textLines.flatMapValues(value => value.toLowerCase.split("\\W+").toIterable)
data.to(outTopic, produced)
val streams: KafkaStreams = new KafkaStreams(builder.build(), config)
streams
}
private def readAndWriteJson(intopic: String, outTopic: String) = {
val builder = new StreamsBuilderS()
val produced = Produced.`with`(Serdes.String(), Serdes.String())
val textLines: KStreamS[String, String] = builder.stream(intopic)
val data: KStreamS[String, String] = textLines.mapValues(value => {
val person = Try(parse(value).extract[Person]).toOption
println("1::", person)
val personNameAndEmail = person.map(a => PersonNameAndEmail(a.name, a.email))
println("2::", personNameAndEmail)
write(personNameAndEmail)
})
data.to(outTopic, produced)
val streams: KafkaStreams = new KafkaStreams(builder.build(), config)
streams
}
streams1.start()
streams2.start()
Runtime.getRuntime.addShutdownHook(new Thread(() => {
streams2.close(10, TimeUnit.SECONDS)
streams1.close(10, TimeUnit.SECONDS)
}))
}
case class Person(name: String, age: Int, email: String)
case class PersonNameAndEmail(name: String, email: String)
When I run this and produce Messages on topic person
they do not get consumed.
But When I change the ordering of their start i.e
streams2.start()
streams1.start()
It works fine. So why is starting of One stream blocks the other .Can't we run multiple streams at the same time.