I have a s3 bucket with the structure //storage-layer/raw/__SOME_FOLDERS__
. EG: //storage-layer/raw/GTest
and //storage-layer/raw/HTest
. In these folders, there is the potential to have a few other folders as well, such as raw/GTest/abc
, raw/HTest/xyz
. There will not be an overlap in folders abc
and xyz
from GTest
or HTest
.
I am successful in setting up a spark structured streaming to monitor raw/GTest/abc
for parquet
files coming in, and writing the results out to console.
def process_row(df, epoch_id):
df.show()
# Structured Streaming
(
self.spark
.readStream
.format("parquet")
.option("maxFilesPerTrigger", 20)
.option("inferSchema", "true")
.load("s3a://storage-layer/raw/GTest/abc/*")
.writeStream
.format("console")
.outputMode("append")
.trigger(processingTime="5 seconds")
# .foreachBatch(process_row)
.start()
.awaitTermination()
)
My problem is, how can i set up 1 structured streaming app to readStream from the upper folder: storage-layer/raw/*
do some processing on it, and save it into a completely different folder / bucket in s3?
I have taken a look at foreachBatch
above, but i'm not sure how to set it up such that it can achieve the end result. I get the error message Unable to infer schema for Parquet. It must be specified manually.
Example of end result:
parquet files saving into s3
storage-layer/raw/GTest/abc
-> structured streamed + processed intostorage-layer/processed/GTest/abc
as parquet file.parquet files saving into s3
storage-layer/raw/HTest/xyz
-> structured streamed + processed intostorage-layer/processed/HTest/xyz
as parquet file.