I have a Spark Dataset of events, indexed by a timestamp. What I would like to do is enrich each entry with additional information : the number of events occuring in the five minutes (300 seconds) following this event. So if initial data consists of two columns event_id
and timestamp
, I want to build a third columnn counter
like below :
event_id timestamp counter
0 0 4
1 100 3
2 150 2
3 250 1
4 275 0
5 600 2
6 610 1
7 750 1
8 950 2
9 1100 1
10 1200 0
I know that using Spark I can use windows to count future events within a window of fixed size in term of number of events.
val window = Window.orderBy('timestamp).rowsBetween(0, 300)
myDataset.withColumn("count_future_events", sum(lit(1)).over(window))
But this is not interesting because the result is obviously always the same.
I wish something like this existed :
val window = Window.orderBy('timestamp).rowsBetween('timestamp, 'timestamp + 300) // 300 seconds here
But this does not compile.
Is there any way to achieve what I want ?