1

My code is something like

df = spark.readStream.option("header","true") \
    .schema(df_schema)\
    .csv(df_file)
df2 = df.filter(df.col == 1)
df3 = df2.withColumn("new_col", udf_f(df2.some_col))
dfc = df3.where(df3.new_col == 2).count()
query = dfc.writeStream.outputMode("append").format("console").start()
query.awaitTermination()

I get the error message Queries with streaming sources must be executed with writeStream.start() at the dfc line but I'm not sure what I'm doing wrong. Does Spark structured streaming not support chained queries like this? I'm not doing any branches as per my knowledge.

EDIT:

By removing count() from the dfc line, I got a new error StreamingQueryException: Exception thrown in awaitResult arising from the query.awaitTermination() call. Any idea why count() did not work and also why the new error arose?

EDIT 2:

If I log to console directly without running all the intermediate queries after df, it works. However, every time I try to run an additional query, the StreamingQueryException is raised.

zero323
  • 322,348
  • 103
  • 959
  • 935
absolutelydevastated
  • 1,657
  • 1
  • 11
  • 28

1 Answers1

1

Due to the nature of structured streaming it's not possible get the count in the same way as for static data frames. When a stream is created Spark is polling sources using trigger for a new data. If there is any Spark splits it up to small DataFrames (micro-batches) and passes along the stream (transformation, aggregation, output).

If you need to get number of records you can to add a listener to get progress updates and get number of inputs in onQueryProgress(QueryProgressEvent event).

It's hard to say why you are getting StreamingQueryException since filter() and withColumn() work properly in structured streaming. Did you see other errors in console that could cause Exception thrown in awaitResult?

By the way, if you have multiple streams in a single session you should use spark.streams.awaitAnyTermination() to block until any one of them terminates.

The following query should work properly:

query = spark.readStream
    .option("header","true") \
    .schema(df_schema)\
    .csv(df_file)\
    .filter(df.col == 1)\
    .withColumn("new_col", udf_f(df2.some_col))\
    .writeStream\
    .format("console")\
    .outputMode("append")\
    .start()

query.awaitTermination()
# or spark.streams().awaitAnyTermination()
Yuriy Bondaruk
  • 4,512
  • 2
  • 33
  • 49
  • Thank you. The error causing the exception is `java.util.NoSuchElementException: None.get at scala.None$.get(Option.scala:347) at scala.None$.get(Option.scala:345)` If I remove the streaming component, that is, if I sample the data, the code works. – absolutelydevastated Mar 06 '18 at 03:00
  • I also used a `join` on the dataframe. Does that impact anything? – absolutelydevastated Mar 06 '18 at 04:11
  • The error you get looks like a [warning in metrics](https://stackoverflow.com/questions/47635510/strange-spark-error-on-aws-emr) that doesn't affect spark data processing. If you are using AWS EMR then should try to downgrade it to 5.9.0 or just suppress the warning. – Yuriy Bondaruk Mar 06 '18 at 04:34
  • You can join static data frames to a stream and starting from Spark 2.3 [stream-stream joins](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#stream-stream-joins) are supported. If you are using EMR then you should wait a bit for a new version of EMR since the latest one has Spark 2.2.1 only. – Yuriy Bondaruk Mar 06 '18 at 04:39
  • My error terminates the job. I get `..: java.util.NoSuchElementException: None.get at scala.None$.get(Option.scala:347) at scala.None$.get(Option.scala:345) at org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3.apply(statefulOperators.scala:204) at org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3.apply(statefulOperators.scala:172)` – absolutelydevastated Mar 06 '18 at 06:24
  • Is it perhaps because I joined two of the same frames? `x = df.select(df.col).limit(n)` `y = df.join(x, ['col'], 'leftsemi')` I initialise `x` before the stream starts. – absolutelydevastated Mar 06 '18 at 06:25
  • ^ Not the issue. Tried changing it to another dataframe and it didn't work as well. – absolutelydevastated Mar 06 '18 at 07:02