I am trying to setup a structured streaming job with a map() transformation that make REST API calls. Here are the details:
(1)
df=spark.readStream.format('delta') \
.option("maxFilesPerTrigger", 1000) \
.load(f'{file_location}')
(2)
respData=df.select("resource", "payload").rdd.map(lambda row: put_resource(row[0], row[1])).collect()
respDf=spark.createDataFrame(respData, ["resource", "status_code", "reason"])
(3)
respDf.writeStream \
.trigger(once=True) \
.outputMode("append") \
.format("delta") \
.option("path", f'{file_location}/Response') \
.option("checkpointLocation", f'{file_location}/Response/Checkpoints') \
.start()
However, I got an error: Queries with streaming sources must be executed with writeStream.start() on step (2).
Any help will be appreciated. Thank you.