2

I am using Spark Structured streaming to process data from Kafka. I transform each message to JSON. However, spark needs an explicit schema to obtain columns from JSON. Spark Streaming with DStreams allows doing following

spark.read.json(spark.createDataset(jsons))

where jsons is RDD[String]. In case of Spark Structured Streaming similar approach

df.sparkSession.read.json(jsons)

(jsons is DataSet[String])

results to the following exception

Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;

I assume that read triggers execution instead of start, but is there a way to bypass this?

gorros
  • 1,411
  • 1
  • 18
  • 29

1 Answers1

1

To stream from JSON on Kafka to a DataFrame you need to do something like this:

  case class Colour(red: Int, green: Int, blue: Int)

  val colourSchema: StructType = new StructType()
    .add("entity", "int")
    .add("security", "int")
    .add("client", "int")

  val streamingColours: DataFrame = spark
    .readStream
    .format("kafka")
    .load()
    .selectExpr("CAST(value AS STRING)")
    .select(from_json($"value", colourSchema))

  streamingColours
    .writeStream
    .outputMode("complete")
    .format("console")
    .start()

This should create a streaming DataFrame, and show the results of reading from Kafka on the console.

I do not believe it is possible to use "infer schema" with streaming data sets. And this makes sense, since infer schema looks at a large set of data to work out what the types are etc. With streaming datasets the schema that could be inferred by processing the first message might be different to the schema of the second message, etc. And Spark needs one schema for all elements of the DataFrame.

What we have done in the past is to process a batch of JSON messages with Spark's batch processing and using infer schema. And then export that schema for use with streaming datasets.

Patrick McGloin
  • 2,204
  • 1
  • 14
  • 26