1

I am trying to setup a structured streaming job with a map() transformation that make REST API calls. Here are the details:

(1)
df=spark.readStream.format('delta') \
.option("maxFilesPerTrigger", 1000) \
.load(f'{file_location}') 

(2)
respData=df.select("resource", "payload").rdd.map(lambda row: put_resource(row[0], row[1])).collect()
respDf=spark.createDataFrame(respData, ["resource", "status_code", "reason"])

(3)
respDf.writeStream \
.trigger(once=True) \
.outputMode("append") \
.format("delta") \
.option("path", f'{file_location}/Response') \
.option("checkpointLocation", f'{file_location}/Response/Checkpoints') \
.start()

However, I got an error: Queries with streaming sources must be executed with writeStream.start() on step (2).

Any help will be appreciated. Thank you.

GLP
  • 3,441
  • 20
  • 59
  • 91

1 Answers1

0

you have to execute your stream on df also meaning df.writeStream.start()..

there is a similar thread here :

Queries with streaming sources must be executed with writeStream.start();

Zohar Stiro
  • 241
  • 2
  • 5
  • So you mean it is not possible to run some map transformation on the rdd in between readStream and writeStream? – GLP Oct 08 '20 at 04:58
  • you are not running just a map transformation. you are collecting the results and using this as input to create a new data frame. in fact you have 2 streams running and you should start both. if you would like to do a transformation on a streaming dataframe you can just do spark.readStream..load().map().writeStream.start – Zohar Stiro Oct 08 '20 at 06:44
  • It is not possible to run `map` in PySpark Structured Streaming. You must use a `udf`. – ecoe Jun 02 '23 at 00:07