My use case
- the input is raw events keyed by an ID
- I'd like to count the total number of events over the past 7 days for each ID.
- the output would be every 10 mins advance
- Logically, this will be handled by a sliding window of size 7 day and advance 10min
This post laid out a good optimization solution by a tumbling window of 1 day
So my logic would be like
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val oneDayCounts = joins
.keyBy(keyFunction)
.map(t => (t.key, 1L, t.timestampMs))
.keyBy(0)
.timeWindow(Time.days(1))
val sevenDayCounts = oneDayCounts
.keyBy(0)
.timeWindow(Time.days(7), Time.minutes(10))
.sum(1)
// single reducer
sevenDayCounts
.windowAll(TumblingProcessingTimeWindows.of(Time.minutes(10)))
P.S. forget about the performance concern of the single reducer.
Question
If I understand correctly, however, this would mean a single event would produce 7*24*6=1008 records due to the nature of the sliding window. So my question is how can I reduce the sheer amount?