Hi Im reading froma kafka topic and i want to process the data received from kafka such as tockenization, filtering out unncessary data, removing stop words and finally I want to write back to another Kafka topic
// read from kafka
val readStream = existingSparkSession
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", hostAddress)
.option("subscribe", "my.raw") // Always read from offset 0, for dev/testing purpose
.load()
val df = readStream.selectExpr("CAST(value AS STRING)" )
df.show(false)
val df_json = df.select(from_json(col("value"), mySchema.defineSchema()).alias("parsed_value"))
val df_text = df_json.withColumn("text", col("parsed_value.payload.Text"))
// perform some data processing actions such as tokenization etc and return cleanedDataframe as the final result
// write back to kafka
val writeStream = cleanedDataframe
.writeStream
.outputMode("append")
.format("kafka")
.option("kafka.bootstrap.servers", hostAddress)
.option("topic", "writing.val")
.start()
writeStream.awaitTermination()
Then I am getting the below error
Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
Then I have edited my code as follows to read from kafka and write into console
// read from kafka
val readStream = existingSparkSession
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", hostAddress)
.option("subscribe", "my.raw") // Always read from offset 0, for dev/testing purpose
.load()
// write to console
val df = readStream.selectExpr("CAST(value AS STRING)" )
val query = df.writeStream
.outputMode("append")
.format("console")
.start().awaitTermination();
// then perform the data processing part as mentioned in the first half
With the second method, continuously data was displaying in the console but it never run through data processing part. Can I know how can I read from a kafka topic and then perform some actions ( tokenization, removing stop words) on the received data and finally writing back to a new kafka topic?
EDIT
Stack Trace is pointing at df.show(false) in the above code during the error