2

I'm trying to write a test for spark streaming example that consumes data from kafka. I'm using EmbeddedKafka for this.

  implicit val config = EmbeddedKafkaConfig(kafkaPort = 12345)

  EmbeddedKafka.start()
  EmbeddedKafka.createCustomTopic(topic)

  println(s"Kafka Running ${EmbeddedKafka.isRunning}")

  val spark = SparkSession.builder.appName("StructuredStreaming").master("local[2]").getOrCreate
  import spark.implicits._

  val df = spark.readStream.format("kafka")
    .option("kafka.bootstrap.servers", "localhost:12345")
    .option("subscribe", topic)
    .load()

  // pushing data to kafka
  vfes.foreach(e => {
    val json = ...
    EmbeddedKafka.publishStringMessageToKafka(topic, json)
  })

  val query = df.selectExpr("CAST(value AS STRING)")
    .as[String]
    .writeStream.format("console")

  query.start().awaitTermination()
  spark.stop()
  EmbeddedKafka.stop()

When I run this, it keeps running and doesn't stop or print anything to the console. I cannot figure out why is that. I also tried terminating kafka by calling EmbeddedKafka.stop() before calling stop on the stream.

philantrovert
  • 9,904
  • 3
  • 37
  • 61
tharindu_DG
  • 8,900
  • 6
  • 52
  • 64
  • Kafka topics are by definition endless. You need a different stopping criterion such as input timeout. But it's been two years and perhaps you've reached that conclusion already :) – Hristo Iliev Apr 06 '20 at 14:53

1 Answers1

0

Try setting timeout with

 query.start().awaitTermination( 3000)

wherein the 3000 is in milli seconds