javaapache-flinkstream-processingamazon-kinesis-analytics

How to inject delay between the window and sink operator?


Context - Application

We have an Apache Flink application which processes events

Context - Infrastructure

Application:

Database:

Problem

Because we are using event time characteristics with 1 minute tumbling window all regions' sink emit their records nearly at the same time.

Current State

What we want to achieve is to add artificial delay between window and sink operators to postpone sink emition.

Desired State

Flink App Offset Window 1 Sink 1st run Window 2 Sink 2nd run
#1 0 60 60 120 120
#2 12 60 72 120 132
#3 24 60 84 120 144
#4 36 60 96 120 156
#5 48 60 108 120 168

Not working work-around

We have thought that we can add some sleep to evictor's evictBefore like this

...
.keyBy(event -> event.getSessionId())
.window(getWindowAssigner(config))
.allowedLateness(Time.seconds(config.getWindowLatenessInSec()))
.evictor(new Evictor<>() {
    private static final long serialVersionUID = 5373966807521260856L;

    public void evictBefore(Iterable<TimestampedValue<Event>> iterable, int i, TimeWindow timeWindow, EvictorContext evictorContext) {
        try {
            Thread.sleep(config.getWindowingDelayInMilliSec());
        } catch (InterruptedException ignore) {
        }
    }

    @Override
    public void evictAfter(Iterable<TimestampedValue<Event>> iterable, int i, TimeWindow timeWindow, EvictorContext evictorContext) {

    }
})
...

but it does not work reliably.


Solution

  • You could use TumblingEventTimeWindows of(Time size, Time offset, WindowStagger windowStagger) with WindowStagger.RANDOM.

    See https://nightlies.apache.org/flink/flink-docs-stable/api/java/org/apache/flink/streaming/api/windowing/assigners/WindowStagger.html for documentation.