My code is something like
df = spark.readStream.option("header","true") \
.schema(df_schema)\
.csv(df_file)
df2 = df.filter(df.col == 1)
df3 = df2.withColumn("new_col", udf_f(df2.some_col))
dfc = df3.where(df3.new_col == 2).count()
query = dfc.writeStream.outputMode("append").format("console").start()
query.awaitTermination()
I get the error message Queries with streaming sources must be executed with writeStream.start()
at the dfc
line but I'm not sure what I'm doing wrong. Does Spark structured streaming not support chained queries like this? I'm not doing any branches as per my knowledge.
EDIT:
By removing count()
from the dfc
line, I got a new error StreamingQueryException: Exception thrown in awaitResult
arising from the query.awaitTermination()
call. Any idea why count()
did not work and also why the new error arose?
EDIT 2:
If I log to console directly without running all the intermediate queries after df, it works. However, every time I try to run an additional query, the StreamingQueryException
is raised.