0

I have the following table:

DEST_COUNTRY_NAME   ORIGIN_COUNTRY_NAME count
United States       Romania             15
United States       Croatia             1
United States       Ireland             344
Egypt               United States       15  

The table is represented as a Dataset.

scala> dataDS
res187: org.apache.spark.sql.Dataset[FlightData] = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]

I am able to sort the entries as a batch process.

scala> dataDS.sort(col("count")).show(100);

I now want to try if I can do the same using streaming. To do this, I suppose I will have to read the file as a stream.

scala> val staticSchema = dataDS.schema;
staticSchema: org.apache.spark.sql.types.StructType = StructType(StructField(DEST_COUNTRY_NAME,StringType,true), StructField(ORIGIN_COUNTRY_NAME,StringType,true), StructField(count,IntegerType,true))

scala> val dataStream = spark.
     | readStream.
     | schema(staticSchema).
     | option("header","true").
     | csv("data/flight-data/csv/2015-summary.csv");
dataStream: org.apache.spark.sql.DataFrame = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]

scala> dataStream.isStreaming;
res245: Boolean = true

But I am not able to progress further w.r.t. how to read the data as a stream.

I have executed the sort transformation` process

scala> dataStream.sort(col("count"));
res246: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]

I suppose now I should use Dataset's writeStream method. I ran the following two commands but both returned errors.

scala> dataStream.sort(col("count")).writeStream.
     | format("memory").
     | queryName("sorted_data").
     | outputMode("complete").
     | start();
org.apache.spark.sql.AnalysisException: Complete output mode not supported when there are no streaming aggregations on streaming DataFrames/Datasets;;

and this one

scala> dataStream.sort(col("count")).writeStream.
     | format("memory").
     | queryName("sorted_data").
     | outputMode("append").
     | start();
org.apache.spark.sql.AnalysisException: Sorting is not supported on streaming DataFrames/Datasets, unless it is on aggregated DataFrame/Dataset in Complete output mode;;

From the errors, it seems I should be aggregating (group) data but I thought I don't need to do it as I can run any batch operation as a stream.

How can I understand how to sort data which arrives as a stream?

halfer
  • 19,824
  • 17
  • 99
  • 186
Manu Chadha
  • 15,555
  • 19
  • 91
  • 184

1 Answers1

1

Unfortunately what the error messages tell you is accurate.

The point you make:

but I thought I don't need to do it as I can run any batch operation as a stream.

is not without merit, but it misses a fundamental point, that Structured Streaming is not tightly bound to micro-batching.

One could easily come up with some unscalable hack

import org.apache.spark.sql.functions._

dataStream
  .withColumn("time", window(current_timestamp, "5 minute"))  // Some time window
  .withWatermark("time", "0 seconds")  // Immediate watermark
  .groupBy("time")
  .agg(sort_array(collect_list(struct($"count", $"DEST_COUNTRY_NAME", $"ORIGIN_COUNTRY_NAME"))).as("data"))
  .withColumn("data", explode($"data"))
  .select($"data.*")
  .select(df.columns.map(col): _*)
  .writeStream
  .outputMode("append")
   ...
  .start()