15

I am trying to use structured streaming approach using Spark-Streaming based on DataFrame/Dataset API to load a stream of data from Kafka.

I use:

  • Spark 2.10
  • Kafka 0.10
  • spark-sql-kafka-0-10

Spark Kafka DataSource has defined underlying schema:

|key|value|topic|partition|offset|timestamp|timestampType|

My data come in json format and they are stored in the value column. I am looking for a way how to extract underlying schema from value column and update received dataframe to columns stored in value? I tried the approach below but it does not work:

 val columns = Array("column1", "column2") // column names
 val rawKafkaDF = sparkSession.sqlContext.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers","localhost:9092")
  .option("subscribe",topic)
  .load()
  val columnsToSelect = columns.map( x => new Column("value." + x))
  val kafkaDF = rawKafkaDF.select(columnsToSelect:_*)

  // some analytics using stream dataframe kafkaDF

  val query = kafkaDF.writeStream.format("console").start()
  query.awaitTermination()

Here I am getting Exception org.apache.spark.sql.AnalysisException: Can't extract value from value#337; because in time of creation of the stream, values inside are not known...

Do you have any suggestions?

zero323
  • 322,348
  • 103
  • 959
  • 935
Stefan Repcek
  • 2,553
  • 4
  • 21
  • 29

1 Answers1

13

From the Spark perspective value is just a byte sequence. It has no knowledge about the serialization format or content. To be able to extract the filed you have to parse it first.

If data is serialized as a JSON string you have two options. You can cast value to StringType and use from_json and provide a schema:

import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.from_json

val schema: StructType = StructType(Seq(
  StructField("column1", ???),
  StructField("column2", ???)
))

rawKafkaDF.select(from_json($"value".cast(StringType), schema))

or cast to StringType, extract fields by path using get_json_object:

import org.apache.spark.sql.functions.get_json_object

val columns: Seq[String] = ???

val exprs = columns.map(c => get_json_object($"value", s"$$.$c"))

rawKafkaDF.select(exprs: _*)

and cast later to the desired types.

zero323
  • 322,348
  • 103
  • 959
  • 935
  • 1
    FWIW, i find both alternatives clunky: + the schema alternative requires schema specification using Spark native code. So if we are shipping a complicated schema, code has to parse a schema file (xsd etc) and build this object. + get_json_object alternative forces individually popping out paths/fields. I'm not sure whats the performance penalty of this. I'd have prefered if Apache Spark shipped with a easier way to cleanly accept a schema file and generate Spark/Catalyst schema object. – Venki Apr 08 '19 at 13:46
  • another way to get schema for messages (its yet another clunky way): https://stackoverflow.com/questions/48361177/spark-structured-streaming-kafka-convert-json-without-schema-infer-schema – Venki Apr 08 '19 at 13:56
  • i am having hard time to do it in python. can anyone suggest same. – Monu Mar 11 '22 at 13:01