2

How to do stateless aggregations in spark using Structured Streaming 2.3.0 without using flatMapsGroupWithState or Dstream API? looking for a more declarative way

Example:

select count(*) from some_view

I want the output to just count whatever records are available in each batch but not aggregate from the previous batch

zero323
  • 322,348
  • 103
  • 959
  • 935
user1870400
  • 6,028
  • 13
  • 54
  • 115

1 Answers1

1

To do stateless aggregations in spark using Structured Streaming 2.3.0 without using flatMapsGroupWithState or Dstream API, you can use following code-

import spark.implicits._

def countValues = (_: String, it: Iterator[(String, String)]) => it.length

val query =
  dataStream
    .select(lit("a").as("newKey"), col("value"))
    .as[(String, String)]
    .groupByKey { case(newKey, _) => newKey }
    .mapGroups[Int](countValues)
    .writeStream
    .format("console")
    .start()

Here what we are doing is-

  1. We added one column to our datastream - newKey. We did this so that we can do a groupBy over it, using groupByKey. I have used a literal string "a", but you can use anything. Also, you need to select anyone column from the available columns in datastream. I have selected value column for this purpose, you can select anyone.
  2. We created a mapping function - countValues, to count the values aggregated by groupByKey function by writing it.length.

So, in this way, we can count whatever records are available in each batch but not aggregating from the previous batch.

I hope it helps!

himanshuIIITian
  • 5,985
  • 6
  • 50
  • 70