0

I am working with the new Kafka-scala-streams api recently opensourced by lightbend. And I am trying to run two streams. But Whats happening is two of them don't run simultaneously and I am not getting the desired output.

package in.internity

import java.util.Properties
import java.util.concurrent.TimeUnit

import com.lightbend.kafka.scala.streams.{KStreamS, StreamsBuilderS}
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.kstream.Produced
import org.apache.kafka.streams.{StreamsConfig, _}
import org.json4s.DefaultFormats
import org.json4s.native.JsonMethods.parse
import org.json4s.native.Serialization.write

import scala.util.Try

/**
  * @author Shivansh <shiv4nsh@gmail.com>
  * @since 8/1/18
  */
object Boot extends App {
  implicit val formats: DefaultFormats.type = DefaultFormats
  val config: Properties = {
    val p = new Properties()
    p.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application")
    p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
    p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass)
    p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass)
    p
  }

  val streams1 = wordSplit("lines", "wordCount")
  val streams2 = readAndWriteJson("person", "personName")

  private def wordSplit(intopic: String, outTopic: String) = {
    val builder = new StreamsBuilderS()
    val produced = Produced.`with`(Serdes.String(), Serdes.String())
    val textLines: KStreamS[String, String] = builder.stream(intopic)
    val data: KStreamS[String, String] = textLines.flatMapValues(value => value.toLowerCase.split("\\W+").toIterable)
    data.to(outTopic, produced)

    val streams: KafkaStreams = new KafkaStreams(builder.build(), config)
    streams
  }

  private def readAndWriteJson(intopic: String, outTopic: String) = {
    val builder = new StreamsBuilderS()
    val produced = Produced.`with`(Serdes.String(), Serdes.String())
    val textLines: KStreamS[String, String] = builder.stream(intopic)
    val data: KStreamS[String, String] = textLines.mapValues(value => {
      val person = Try(parse(value).extract[Person]).toOption
      println("1::", person)
      val personNameAndEmail = person.map(a => PersonNameAndEmail(a.name, a.email))
      println("2::", personNameAndEmail)
      write(personNameAndEmail)
    })
    data.to(outTopic, produced)

    val streams: KafkaStreams = new KafkaStreams(builder.build(), config)
    streams
  }

  streams1.start()
  streams2.start()
  Runtime.getRuntime.addShutdownHook(new Thread(() => {
    streams2.close(10, TimeUnit.SECONDS)
    streams1.close(10, TimeUnit.SECONDS)
  }))
}

case class Person(name: String, age: Int, email: String)

case class PersonNameAndEmail(name: String, email: String)

When I run this and produce Messages on topic person they do not get consumed. But When I change the ordering of their start i.e

streams2.start()
streams1.start()

It works fine. So why is starting of One stream blocks the other .Can't we run multiple streams at the same time.

Shivansh
  • 3,454
  • 23
  • 46

1 Answers1

0

Got it working , seems like I was trying to initialize the stream twice in different methods themselves (silly of me :P )

Working code :

package in.internity

import java.util.Properties
import java.util.concurrent.TimeUnit

import com.lightbend.kafka.scala.streams.{KStreamS, StreamsBuilderS}
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.kstream.Produced
import org.apache.kafka.streams.{StreamsConfig, _}
import org.json4s.DefaultFormats
import org.json4s.native.JsonMethods.parse
import org.json4s.native.Serialization.write

import scala.util.Try

/**
  * @author Shivansh <shiv4nsh@gmail.com>
  * @since 8/1/18
  */
object Boot extends App {
  implicit val formats: DefaultFormats.type = DefaultFormats
  val config: Properties = {
    val p = new Properties()
    p.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application")
    p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
    p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass)
    p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass)
    p
  }

  val builder = new StreamsBuilderS()

  private def wordSplit(intopic: String, outTopic: String) = {
    val produced = Produced.`with`(Serdes.String(), Serdes.String())
    val textLines: KStreamS[String, String] = builder.stream(intopic)
    val data: KStreamS[String, String] = textLines.flatMapValues(value => value.toLowerCase.split("\\W+").toIterable)
    data.to(outTopic, produced)
  }

  private def readAndWriteJson(intopic: String, outTopic: String) = {
    val produced = Produced.`with`(Serdes.String(), Serdes.String())
    val textLines: KStreamS[String, String] = builder.stream(intopic)
    val data: KStreamS[String, String] = textLines.mapValues(value => {
      val person = Try(parse(value).extract[Person]).toOption
      println("1::", person)
      val personNameAndEmail = person.map(a => PersonNameAndEmail(a.name, a.email))
      println("2::", personNameAndEmail)
      write(personNameAndEmail)
    })
    data.to(outTopic, produced)
  }


  wordSplit("lines", "wordCount")
  readAndWriteJson("person", "personName")
  val streams: KafkaStreams = new KafkaStreams(builder.build(), config)
  streams.start()
  streams

  Runtime.getRuntime.addShutdownHook(new Thread(() => {
    streams.close(10, TimeUnit.SECONDS)
  }))
}

case class Person(name: String, age: Int, email: String)
case class PersonNameAndEmail(name: String, email: String)
Shivansh
  • 3,454
  • 23
  • 46