1

I have a s3 bucket with the structure //storage-layer/raw/__SOME_FOLDERS__. EG: //storage-layer/raw/GTest and //storage-layer/raw/HTest. In these folders, there is the potential to have a few other folders as well, such as raw/GTest/abc, raw/HTest/xyz. There will not be an overlap in folders abc and xyz from GTest or HTest.

I am successful in setting up a spark structured streaming to monitor raw/GTest/abc for parquet files coming in, and writing the results out to console.

def process_row(df, epoch_id):
    df.show()


# Structured Streaming 
(
        self.spark
        .readStream
        .format("parquet")
        .option("maxFilesPerTrigger", 20)            
        .option("inferSchema", "true")
        .load("s3a://storage-layer/raw/GTest/abc/*")

        .writeStream
        .format("console")
        .outputMode("append")
        .trigger(processingTime="5 seconds")
        # .foreachBatch(process_row)
        .start()
        .awaitTermination()
)

My problem is, how can i set up 1 structured streaming app to readStream from the upper folder: storage-layer/raw/* do some processing on it, and save it into a completely different folder / bucket in s3?

I have taken a look at foreachBatch above, but i'm not sure how to set it up such that it can achieve the end result. I get the error message Unable to infer schema for Parquet. It must be specified manually.

Example of end result:

  • parquet files saving into s3 storage-layer/raw/GTest/abc -> structured streamed + processed into storage-layer/processed/GTest/abc as parquet file.

  • parquet files saving into s3 storage-layer/raw/HTest/xyz -> structured streamed + processed into storage-layer/processed/HTest/xyz as parquet file.

jake wong
  • 4,909
  • 12
  • 42
  • 85

1 Answers1

0
  • For Unable to infer the schema for Parquet. It must be specified manually. Spark stream cannot infer schema automatically as we see in static read. So need to provide schema explicitly for the data at s3a://storage-layer/raw/* programmatically or stored in an external file. Have a look at this.
  • You have two different source locations so need two readStream. If the data at storage-layer/raw/* has the same schema and you want to achieve it using only one readStream then include an extra field as stream_source_path at writing process and the process which writes data at storage-layer/raw/* should populate this field. So now your streaming app knows from which source location data is being read and you can derive two data frames based on stream_source_path value from a single readStream.
  • The above two data frames can be now written to separate sinks.
  • Spark has out-of-box support for File sink and you want to write data in parquet format. So you don't need foreach or foreachbatch implementation.

Code snippet -


    val schemaObj = new Schema.Parser().parse(avsc_schema_file)
    val schema = SchemaConverters.toSqlType(schemaObj).dataType.asInstanceOf[StructType]

    val stream = sparkSession.readStream
      .schema(schema)
      .format("parquet")
      .option("cleanSource","archive")
      .option("maxFilesPerTrigger", "1")
      .option("sourceArchiveDir","storage-layer/streaming_source_archive/")
      .option("latestFirst", value = true)
      .load("s3a://storage-layer/raw/*")

val df_abc = stream.filter(col("stream_source_path") === "storage-layer/raw/GTest/abc")

val df_xyz = stream.filter(col("stream_source_path") === "storage-layer/raw/GTest/xyz")

df_abc.writeStream
    .format("parquet")        
    .option("path", "storage-layer/processed/GTest/abc")
    .option("checkpointLocation", "storage-layer/streaming_checkpoint/GTest/abc")
    .trigger(Trigger.ProcessingTime("2 seconds"))
    .start()

df_xyz.writeStream 
    .format("parquet")        
    .option("path", "storage-layer/processed/GTest/xyz")
    .option("checkpointLocation", "storage-layer/streaming_checkpoint/GTest/xyz")
    .trigger(Trigger.ProcessingTime("2 seconds"))
    .start()

sparkSession.streams.active.foreach(x => x.awaitTermination())

Vikramsinh Shinde
  • 2,742
  • 2
  • 23
  • 29