I'm reading a small sample of data stored in a Kafka but instead of applying watermarks directly to the source I do some processing on the data then extract the event timestamps. I then apply event time window of 5 seconds. Most times the results are fine but sometimes the window seems to get triggered (early?) and does not contain some events even though their timestamps should fall inside the window duration. I am only testing for 1-window. See below for example and trying to understand why some events though their timestamps lie inside the window bounds do not get included in the window?
//seconds
int windowSize = 5
int windowSlide = 5
int windowOffset = 0
int envParallelism = 5
//read data from kafkasource
DataStream<String> eventstream =
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source").setParallelism(1);
// some preprocessing, produce inValue tuples
eventstream = eventstream.flatMap(new JsonFlatMap());
WatermarkStrategy watermarkStrategy = WatermarkStrategy.
forBoundedOutOfOrderness(Duration.ofSeconds(maxOutOfOrderness));
WatermarkStrategy watermarkStrategyWithTimestampAssigner = watermarkStrategy
.withTimestampAssigner(
new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event event, long recordTimestamp) {
return event.time;
}
});
eventStream = eventStream.assignTimestampsAndWatermarks(watermarkStrategyWithTimestampAssigner).setParallelism(1);
eventStream.print();
//--for every "eventName"(key), add "inValue" to "inValueList" and "outValue" to "outValueList"
DataStream<Model> output = eventStream
.keyBy(new EventKeySelector("eventName")
.window(SlidingEventTimeWindows.of(
Time.seconds(windowSize), Time.seconds(windowSlide),
Time.seconds(windowOffset)))
.process(new GraphProcessWindowFunction());
output.print();
This is the data stored in kafka, only four keys(eventName):
{"eventName":1,"outValue":2,"time":1743663199798}
{"eventName":2,"outValue":1,"time":1743663199908}
{"eventName":2,"outValue":6,"time":1743663199911}
{"eventName":3,"outValue":6,"time":1743663199914}
{"eventName":6,"outValue":1,"time":1743663199918}
{"eventName":6,"outValue":3,"time":1743663199920}
{"eventName":2,"outValue":6,"time":1743663207928}
{"eventName":3,"outValue":6,"time":1743663207933}
The last two events are produced after a 8000ms so are rightfully not included in the window. After processing the “inValue” tuples are produced.
[RUN 2]Example of unexpected results I sometimes get. Some events are not assigned to the window despite their timestamp falling inside window bounds.
[RUN 3] Another example of unexpected results.
Things I have tried so far:
When I set .allowedLateness()
> 3sec, the "not assigned" events are re-triggering the window but I do not want to deal with the output of the window the second time. How do I avoid this?
In my understanding the problem should be fixed after I have set the maxOutOfOrderness
in forBoundedOutOfOrderness
to be 5 seconds, but it seems to have no effect and I still get wrong results.
I have also tried decreasing env.getConfig().setAutoWatermarkInterval(1);
but that did not help either.
I also printed out the currentwatermark() for each window and its always 1743663202933 for all windows regarless of the scenario. I have never gotten a different value despite changing the watermark interval.
I have not tried assigning a watermark strategy directly at the kafkasource but would it make a difference?
Flink v1.16.0. Java 11
Setting maxOutOfOrderness
to 5 seconds isn't large enough to account for the degree of out-of-orderness that your data can have. So depending on how the race conditions play out, some of your events can be late.
Setting the watermark strategy at the Kafka source is recommended, and will probably make a difference, depending on how your data is stored in kafka (the number of partitions and partitioning strategy).
If your events are stored in order in kafka, within each partition, then you can use `
WatermarkStrategy.forMonotonousTimestamps()
on the kafka source, and everything should work fine.
If you want to continue applying watermarking after the source, then you'll need to set maxOutOfOrderness
to about 8 seconds, since the event at 1743663207933 can be processed before the event at 1743663199798 (events with difference keys will race against each other, and there's no ordering guarantee).