I'm testing Flink CEP with a pattern that matches a single event containing the string "fail". I only send one event and expect an immediate result. In the following code, I use two methods for watermark insertion: WatermarkStrategy and BoundedOutOfOrdernessTimestampExtractor. I'm using Flink version 1.16.1.
I noticed that using WatermarkStrategy doesn't trigger the event immediately; it often requires a second event to trigger. However, BoundedOutOfOrdernessTimestampExtractor triggers the event as expected. Below is my code:
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
KeyedStream<Tuple3<Long, String, String>, String> stream = env
.addSource(new SourceFunction<Tuple3<Long, String, String>>() {
@Override
public void run(SourceContext<Tuple3<Long, String, String>> ctx) throws Exception {
while (true) {
ctx.collect(Tuple3.of(1000L, "a", "fail"));
TimeUnit.SECONDS.sleep(1);
// ctx.collect(Tuple3.of(2000L, "a", "fail"));
TimeUnit.HOURS.sleep(1);
}
}
@Override
public void cancel() {
}
})
.assignTimestampsAndWatermarks(
WatermarkStrategy.<Tuple3<Long, String, String>>forBoundedOutOfOrderness(Duration.ofSeconds(0))
.withTimestampAssigner((SerializableTimestampAssigner<Tuple3<Long, String, String>>) (tuple3, l) -> tuple3.f0
)
)
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple3<Long, String, String>>(Time.of(0, TimeUnit.SECONDS)) {
@Override
public long extractTimestamp(Tuple3<Long, String, String> element) {
return element.f0;
}
})
.keyBy((KeySelector<Tuple3<Long, String, String>, String>) value -> value.f1);
Pattern<Tuple3<Long, String, String>, Tuple3<Long, String, String>> pattern = Pattern.<Tuple3<Long, String, String>>begin("first")
.where(new SimpleCondition<>() {
@Override
public boolean filter(Tuple3<Long, String, String> tuple3) {
return tuple3.f2.equals("fail");
}
}).within(Time.of(10, TimeUnit.SECONDS));
PatternStream<Tuple3<Long, String, String>> patternStream = CEP.pattern(stream, pattern);
patternStream.select((PatternSelectFunction<Tuple3<Long, String, String>, String>) map -> {
System.out.println("trigger:" + map.values().size());
return "trigger";
}).print("warning");
env.execute();
}
Why does WatermarkStrategy fail to trigger the event immediately, while BoundedOutOfOrdernessTimestampExtractor works as expected? How can I achieve immediate triggering using WatermarkStrategy?
forBoundedOutOfOrderness use BoundedOutOfOrdernessWatermarks,onPeriodicEmit return new Watermark(maxTimestamp - outOfOrdernessMillis - 1). BoundedOutOfOrdernessTimestampExtractor,getCurrentWatermark return new Watermark(lastEmittedWatermark). because watermark - 1