3

Hi Im reading froma kafka topic and i want to process the data received from kafka such as tockenization, filtering out unncessary data, removing stop words and finally I want to write back to another Kafka topic

// read from kafka
val readStream = existingSparkSession
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", hostAddress)
      .option("subscribe", "my.raw") // Always read from offset 0, for dev/testing purpose
      .load()

val df = readStream.selectExpr("CAST(value AS STRING)" )
df.show(false)
val df_json = df.select(from_json(col("value"), mySchema.defineSchema()).alias("parsed_value"))
val df_text = df_json.withColumn("text", col("parsed_value.payload.Text"))

// perform some data processing actions such as tokenization etc and return cleanedDataframe as the final result

// write back to kafka
val writeStream = cleanedDataframe
      .writeStream
      .outputMode("append")
      .format("kafka")
      .option("kafka.bootstrap.servers", hostAddress)
      .option("topic", "writing.val")
      .start()
    writeStream.awaitTermination()

Then I am getting the below error

Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;

Then I have edited my code as follows to read from kafka and write into console

// read from kafka
val readStream = existingSparkSession
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", hostAddress)
      .option("subscribe", "my.raw") // Always read from offset 0, for dev/testing purpose
      .load()

// write to console
val df = readStream.selectExpr("CAST(value AS STRING)" )
val query = df.writeStream
      .outputMode("append")
      .format("console")
      .start().awaitTermination();

// then perform the data processing part as mentioned in the first half

With the second method, continuously data was displaying in the console but it never run through data processing part. Can I know how can I read from a kafka topic and then perform some actions ( tokenization, removing stop words) on the received data and finally writing back to a new kafka topic?

EDIT

Stack Trace is pointing at df.show(false) in the above code during the error

Costi Ciudatu
  • 37,042
  • 7
  • 56
  • 92
tharindu
  • 513
  • 6
  • 26

1 Answers1

5

There are two common problems in your current implementation:

  1. Apply show in a streaming context
  2. Code after awaitTermination will not be executed

To 1.

The method show is an action (as opposed to a tranformation) on a dataframe. As you are dealing with streaming dataframes this will cause an error as streaming queries need to be started with start (just as the Excpetion text is telling you).

To 2.

The method awaitTermination is a blocking method which means that subsequent code will not be executed in each micro-batch.

Overall Solution

If you want to read and write to Kafka and in-between want to understand what data is being processed by showing the data in the console you can do the following:

// read from kafka
val readStream = existingSparkSession
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", hostAddress)
      .option("subscribe", "my.raw") // Always read from offset 0, for dev/testing purpose
      .load()

// write to console
val df = readStream.selectExpr("CAST(value AS STRING)" )
df.writeStream
      .outputMode("append")
      .format("console")
      .start()

val df_json = df.select(from_json(col("value"), mySchema.defineSchema()).alias("parsed_value"))
val df_text = df_json.withColumn("text", col("parsed_value.payload.Text"))

// perform some data processing actions such as tokenization etc and return cleanedDataframe as the final result

// write back to kafka
// the columns `key` and `value` of the DataFrame `cleanedDataframe` will be used for producing the message into the Kafka topic.
val writeStreamKafka = cleanedDataframe
      .writeStream
      .outputMode("append")
      .format("kafka")
      .option("kafka.bootstrap.servers", hostAddress)
      .option("topic", "writing.val")
      .start()

existingSparkSession.awaitAnyTermination()

Note the existingSparkSession.awaitAnyTermination() at the very end of the code without using awaitTermination directly after the start. Also, remember that the columns key and value of the DataFrame cleanedDataframe will be used for producing the message into the Kafka topic. However, a column key is not required, see also here

In addition, in case you are using checkpointing (recommended) then you need to have two different locations set: one for the console stream and the other one for the kafka output stream. It is important to keep in mind that those the streaming queries run independently.

Michael Heil
  • 16,250
  • 3
  • 42
  • 77