To do stateless aggregations in spark using Structured Streaming 2.3.0 without using flatMapsGroupWithState
or Dstream API, you can use following code-
import spark.implicits._
def countValues = (_: String, it: Iterator[(String, String)]) => it.length
val query =
dataStream
.select(lit("a").as("newKey"), col("value"))
.as[(String, String)]
.groupByKey { case(newKey, _) => newKey }
.mapGroups[Int](countValues)
.writeStream
.format("console")
.start()
Here what we are doing is-
- We added one column to our
datastream
- newKey
. We did this so that we can do a groupBy
over it, using groupByKey
. I have used a literal string "a"
, but you can use anything. Also, you need to select anyone column from the available columns in datastream
. I have selected value
column for this purpose, you can select anyone.
- We created a mapping function -
countValues
, to count the values aggregated by groupByKey
function by writing it.length
.
So, in this way, we can count whatever records are available in each batch but not aggregating from the previous batch.
I hope it helps!