2

My use case

  • the input is raw events keyed by an ID
  • I'd like to count the total number of events over the past 7 days for each ID.
  • the output would be every 10 mins advance
  • Logically, this will be handled by a sliding window of size 7 day and advance 10min

This post laid out a good optimization solution by a tumbling window of 1 day

So my logic would be like

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val oneDayCounts = joins
  .keyBy(keyFunction)
  .map(t => (t.key, 1L, t.timestampMs))
  .keyBy(0)
  .timeWindow(Time.days(1))

val sevenDayCounts = oneDayCounts
  .keyBy(0)
  .timeWindow(Time.days(7), Time.minutes(10))
  .sum(1)

// single reducer
sevenDayCounts
  .windowAll(TumblingProcessingTimeWindows.of(Time.minutes(10)))

P.S. forget about the performance concern of the single reducer.

Question

If I understand correctly, however, this would mean a single event would produce 7*24*6=1008 records due to the nature of the sliding window. So my question is how can I reduce the sheer amount?

del bao
  • 1,084
  • 1
  • 11
  • 20

1 Answers1

4

There's a JIRA ticket -- FLINK-11276 -- and a google doc on the topic of doing this more efficiently.

I also recommend you take a look at this paper and talk about Efficient Window Aggregation with Stream Slicing.

David Anderson
  • 39,434
  • 4
  • 33
  • 60