0

I am implementing some Spark Structured Streaming transformations from a Parquet data source. In order to read the data into a streaming DataFrame, one has to specify the schema (it cannot be automatically inferred). The schema is really complex and manually writing the schema code will be a very complex task.

Can you suggest a walkaround? Currently I am creating a batch DataFrame beforehand (using the same data source), Spark infers the schema and then I save the schema to a Scala object and use it as an input for the Structured Streaming reader.

I don't think it is a reliable or a well performing solution. Please suggest how to generate the schema code automatically or somehow persist the schema in a file and reuse it.

Alex Ott
  • 80,552
  • 8
  • 87
  • 132
BuahahaXD
  • 609
  • 2
  • 8
  • 24

1 Answers1

0

From the docs:

By default, Structured Streaming from file based sources requires you to specify the schema, rather than rely on Spark to infer it automatically. This restriction ensures a consistent schema will be used for the streaming query, even in the case of failures. For ad-hoc use cases, you can reenable schema inference by setting spark.sql.streaming.schemaInference to true.

You could also open a shell, read one of the parquet files with automatic schema inference enabled, and save the schema to JSON for later reuse. You only have to do this one time, so it might be faster / more efficient than doing the similar-sounding workaround you're using now.

Ryan Hoffman
  • 126
  • 4
  • Hey, I think saving the schema to JSON and reusing it is a great idea but the example that you linked doesn't work for me. I get the following error, when trying to use the schema: overloaded method value schema with alternatives: (schemaString: String)org.apache.spark.sql.streaming.DataStreamReader (schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.streaming.DataStreamReader cannot be applied to (Option[org.apache.spark.sql.types.StructType]) – BuahahaXD Nov 12 '20 at 10:07
  • What about popping open a spark-shell and just loading one of your parquet files as a plain, old, non-streaming DataFrame just using SparkSession.read.parquet(), and using its schema? – Ryan Hoffman Nov 13 '20 at 00:13