1

I'm new to working in Scala with the Spark and Kafka integration. However, I'm running into an issue logging. I have tried many different logging libraries, but they all return the same error from Spark.

The error is the following: Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;

My code is the following:

val df = spark.read.format("kafka")
  .option("kafka.bootstrap.servers", "host1:9092, host2:9092")
  .option("subscribe", "test")
  .load()

// Dataframe of the 'value' column in the original dataframe from above
val msg = df.select("value").as[String]

// modify_msg is a string produced by Extract_info
val modify_msg = Extract_Info(msg.first.getString(0)).toString()

//Error occurs here. I also tried different logger libraries like SLF4J
println(modify_msg)


val query = df.writeStream
  .outputMode("append")
  .format("console")
  .start()
query.awaitTermination()

I am wondering if there is a way to print or log the results. The issue is that writeStream.start() function only works on dataframes and I could not get it to print a string. Any help would be much appreciated.

Misumi
  • 11
  • 1
  • Please have a look at the https://stackoverflow.com/questions/40609771/queries-with-streaming-sources-must-be-executed-with-writestream-start – hagarwal Jan 12 '21 at 04:27
  • I took a look at the article you have posted, but I don't think it represents a solution. Correct me if I am wrong, but a tl;dr of the article says you cannot output anything between loading and starting the stream. If you do, it will cause the error. Please correct me if I am wrong, but the ```.writeSteam``` function only accepts a dataframe as input. – Misumi Jan 12 '21 at 06:26
  • What exactly are you trying to log? At the moment it looks like you are printing the first row of the streaming Dataframe and then the entire Dataframe to the console (which should have the same output location as the print statement). – Michael Heil Jan 12 '21 at 07:43
  • Hi Mike, I'm trying to verify the transformations on the message ```Extract_Info``` function that I have written. The idea of logging this is for me to understand if there is a bug in my code here, or whether it is further upstream in the ```Extract_Info``` function. But I feel confident that the ```Extract_Info``` function is working since it is working with the string representation of the same message value. – Misumi Jan 12 '21 at 16:20

0 Answers0