I'm new to working in Scala with the Spark and Kafka integration. However, I'm running into an issue logging. I have tried many different logging libraries, but they all return the same error from Spark.
The error is the following: Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
My code is the following:
val df = spark.read.format("kafka")
.option("kafka.bootstrap.servers", "host1:9092, host2:9092")
.option("subscribe", "test")
.load()
// Dataframe of the 'value' column in the original dataframe from above
val msg = df.select("value").as[String]
// modify_msg is a string produced by Extract_info
val modify_msg = Extract_Info(msg.first.getString(0)).toString()
//Error occurs here. I also tried different logger libraries like SLF4J
println(modify_msg)
val query = df.writeStream
.outputMode("append")
.format("console")
.start()
query.awaitTermination()
I am wondering if there is a way to print or log the results. The issue is that writeStream.start()
function only works on dataframes and I could not get it to print a string. Any help would be much appreciated.