We have an Apache Flink application which processes events
keyBy
) events based on the sessionId
fieldreduce
and a process
functionsApplication:
Database:
Because we are using event time characteristics with 1 minute tumbling window all regions' sink emit their records nearly at the same time.
What we want to achieve is to add artificial delay between window and sink operators to postpone sink emition.
Flink App | Offset | Window 1 | Sink 1st run | Window 2 | Sink 2nd run |
---|---|---|---|---|---|
#1 | 0 | 60 | 60 | 120 | 120 |
#2 | 12 | 60 | 72 | 120 | 132 |
#3 | 24 | 60 | 84 | 120 | 144 |
#4 | 36 | 60 | 96 | 120 | 156 |
#5 | 48 | 60 | 108 | 120 | 168 |
We have thought that we can add some sleep to evictor's evictBefore
like this
...
.keyBy(event -> event.getSessionId())
.window(getWindowAssigner(config))
.allowedLateness(Time.seconds(config.getWindowLatenessInSec()))
.evictor(new Evictor<>() {
private static final long serialVersionUID = 5373966807521260856L;
public void evictBefore(Iterable<TimestampedValue<Event>> iterable, int i, TimeWindow timeWindow, EvictorContext evictorContext) {
try {
Thread.sleep(config.getWindowingDelayInMilliSec());
} catch (InterruptedException ignore) {
}
}
@Override
public void evictAfter(Iterable<TimestampedValue<Event>> iterable, int i, TimeWindow timeWindow, EvictorContext evictorContext) {
}
})
...
but it does not work reliably.
You could use TumblingEventTimeWindows of(Time size, Time offset, WindowStagger windowStagger)
with WindowStagger.RANDOM
.
See https://nightlies.apache.org/flink/flink-docs-stable/api/java/org/apache/flink/streaming/api/windowing/assigners/WindowStagger.html for documentation.