2

Context - Application

We have an Apache Flink application which processes events

  • The application uses event time characteristics
  • The application shards (keyBy) events based on the sessionId field
  • The application has windowing with 1 minute tumbling window
    • The windowing is specified by a reduce and a process functions
    • So, for each session we will have 1 computed record
  • The application emits the data into a Postgres sink

Context - Infrastructure

Application:

  • It is hosted in AWS via Kinesis Data Analytics (KDA)
  • It is running in 5 different regions
  • The exact same code is running in each region

Database:

  • It is hosted in AWS via RDS (currently it is a PostgreSQL)
  • It is located in one region (with a read replica in a different region)

Problem

Because we are using event time characteristics with 1 minute tumbling window all regions' sink emit their records nearly at the same time.

Current State

What we want to achieve is to add artificial delay between window and sink operators to postpone sink emition.

Desired State

Flink App Offset Window 1 Sink 1st run Window 2 Sink 2nd run
#1 0 60 60 120 120
#2 12 60 72 120 132
#3 24 60 84 120 144
#4 36 60 96 120 156
#5 48 60 108 120 168

Not working work-around

We have thought that we can add some sleep to evictor's evictBefore like this

...
.keyBy(event -> event.getSessionId())
.window(getWindowAssigner(config))
.allowedLateness(Time.seconds(config.getWindowLatenessInSec()))
.evictor(new Evictor<>() {
    private static final long serialVersionUID = 5373966807521260856L;

    public void evictBefore(Iterable<TimestampedValue<Event>> iterable, int i, TimeWindow timeWindow, EvictorContext evictorContext) {
        try {
            Thread.sleep(config.getWindowingDelayInMilliSec());
        } catch (InterruptedException ignore) {
        }
    }

    @Override
    public void evictAfter(Iterable<TimestampedValue<Event>> iterable, int i, TimeWindow timeWindow, EvictorContext evictorContext) {

    }
})
...

but it does not work reliably.

Peter Csala
  • 17,736
  • 16
  • 35
  • 75

1 Answers1

-1

You could use TumblingEventTimeWindows of(Time size, Time offset, WindowStagger windowStagger) with WindowStagger.RANDOM.

See https://nightlies.apache.org/flink/flink-docs-stable/api/java/org/apache/flink/streaming/api/windowing/assigners/WindowStagger.html for documentation.

David Anderson
  • 39,434
  • 4
  • 33
  • 60
  • Hi David, how does it help to set a specific offset? – Peter Csala Mar 07 '22 at 16:46
  • Maybe `TumblingEventTimeWindows.of(Time.seconds(config.getWindowSizeInSec()), Time.seconds(config.getWindowDelayInSec()))` would be also fine for us... Let me test it and I'll update you. – Peter Csala Mar 09 '22 at 11:26
  • 1
    It seems like it is also fine for us if we shift the windows. So we don't have put artificial delays between window and sink operators. – Peter Csala Mar 10 '22 at 15:21