33

I'm trying to read the messages from kafka (version 10) in spark and trying to print it.

     import spark.implicits._

         val spark = SparkSession
              .builder
              .appName("StructuredNetworkWordCount")
              .config("spark.master", "local")
              .getOrCreate()  

            val ds1 = spark.readStream.format("kafka")
              .option("kafka.bootstrap.servers", "localhost:9092")  
              .option("subscribe", "topicA")
              .load()

           ds1.collect.foreach(println)
           ds1.writeStream
           .format("console")
           .start()

           ds1.printSchema()

getting an error Exception in thread "main"

org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;

N Kuria
  • 153
  • 1
  • 5
shivali
  • 437
  • 1
  • 6
  • 13

5 Answers5

35

You are branching the query plan: from the same ds1 you are trying to:

  • ds1.collect.foreach(...)
  • ds1.writeStream.format(...){...}

But you are only calling .start() on the second branch, leaving the other dangling without a termination, which in turn throws the exception you are getting back.

The solution is to start both branches and await termination.

val ds1 = spark.readStream.format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")  
  .option("subscribe", "topicA")  
  .load()
val query1 = ds1.collect.foreach(println)
  .writeStream
  .format("console")
  .start()
val query2 = ds1.writeStream
  .format("console")
  .start()

ds1.printSchema()
query1.awaitTermination()
query2.awaitTermination()
cscan
  • 3,684
  • 9
  • 45
  • 83
ssice
  • 3,564
  • 1
  • 26
  • 44
8

i fixed issue by using following code.

 val df = session
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", brokers)
  .option("subscribe", "streamTest2")
  .load();

    val query = df.writeStream
  .outputMode("append")
  .format("console")
  .start()
query.awaitTermination()
OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
Rajeev Rathor
  • 1,830
  • 25
  • 20
7

I struggled a lot with this issue. I tried each of suggested solution from various blog. But I my case there are few statement in between calling start() on query and finally at last i was calling awaitTerminate() function that cause this.

Please try in this fashion, It is perfectly working for me. Working example:

val query = df.writeStream
      .outputMode("append")
      .format("console")
      .start().awaitTermination();

If you write in this way that will cause exception/ error:

val query = df.writeStream
      .outputMode("append")
      .format("console")
      .start()

    // some statement 
    // some statement 

    query.awaitTermination();

will throw given exception and will close your streaming driver.

raga
  • 899
  • 10
  • 14
Rajeev Rathor
  • 1,830
  • 25
  • 20
  • Worked for me using Java structured streaming. I had no `//some statement`s at all - just saved the StreamingQuery to a variable and then immediately called `sQueryVar.start()` and encountered the same problem. This solved it - thanks! – Taylor Dec 28 '18 at 22:32
  • Does anyone know what the cause of this is? Why do the additional lines between the start() and the awaitTermination() cause an issue? – Brandon Jul 22 '20 at 19:30
0

Kindly remove ds1.collect.foreach(println) and ds1.printSchema() , use outputMode and awaitAnyTermination for background process Waiting until any of the queries on the associated spark.streams has terminated

val spark = SparkSession
    .builder
    .appName("StructuredNetworkWordCount")
    .config("spark.master", "local[*]")
    .getOrCreate()

  val ds1 = spark.readStream.format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "topicA")  .load()

  val consoleOutput1 = ds1.writeStream
     .outputMode("update")
     .format("console")
     .start()

  spark.streams.awaitAnyTermination()

|key|value|topic|partition|offset|
+---+-----+-----+---------+------+
+---+-----+-----+---------+------+
OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
SLU
  • 81
  • 9
  • 2
    Sorry just out of curiosity: why kindly? You are answering OP's question, not requesting :) – Til Jan 24 '19 at 05:32
  • this is mentioning that there is no data coming from kafka. Please try to send data from that topic – NickyPatel Oct 27 '20 at 18:19
0

I was able to resolves this issue by following code. In my scenario, I had multiple intermediate Dataframes, which were basically the transformations made on the inputDF.

 val query = joinedDF
      .writeStream
      .format("console")
      .option("truncate", "false")
      .outputMode(OutputMode.Complete())
      .start()
      .awaitTermination()

joinedDF is the result of the last transformation performed.

P696
  • 23
  • 5