apache-flinkflink-cep

Immediate CEP Event Trigger Issue with WatermarkStrategy in Flink 1.16.1


I'm testing Flink CEP with a pattern that matches a single event containing the string "fail". I only send one event and expect an immediate result. In the following code, I use two methods for watermark insertion: WatermarkStrategy and BoundedOutOfOrdernessTimestampExtractor. I'm using Flink version 1.16.1.

I noticed that using WatermarkStrategy doesn't trigger the event immediately; it often requires a second event to trigger. However, BoundedOutOfOrdernessTimestampExtractor triggers the event as expected. Below is my code:

public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        KeyedStream<Tuple3<Long, String, String>, String> stream = env
                .addSource(new SourceFunction<Tuple3<Long, String, String>>() {
                    @Override
                    public void run(SourceContext<Tuple3<Long, String, String>> ctx) throws Exception {
                        while (true) {
                            ctx.collect(Tuple3.of(1000L, "a", "fail"));
                            TimeUnit.SECONDS.sleep(1);
//                            ctx.collect(Tuple3.of(2000L, "a", "fail"));
                            TimeUnit.HOURS.sleep(1);
                        }
                    }

                    @Override
                    public void cancel() {
                    }
                })
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Tuple3<Long, String, String>>forBoundedOutOfOrderness(Duration.ofSeconds(0))
                                .withTimestampAssigner((SerializableTimestampAssigner<Tuple3<Long, String, String>>) (tuple3, l) -> tuple3.f0
                                )
                )
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple3<Long, String, String>>(Time.of(0, TimeUnit.SECONDS)) {
                    @Override
                    public long extractTimestamp(Tuple3<Long, String, String> element) {
                        return element.f0;
                    }
                })

                .keyBy((KeySelector<Tuple3<Long, String, String>, String>) value -> value.f1);

        Pattern<Tuple3<Long, String, String>, Tuple3<Long, String, String>> pattern = Pattern.<Tuple3<Long, String, String>>begin("first")
                .where(new SimpleCondition<>() {
                    @Override
                    public boolean filter(Tuple3<Long, String, String> tuple3) {
                        return tuple3.f2.equals("fail");
                    }
                }).within(Time.of(10, TimeUnit.SECONDS));


        PatternStream<Tuple3<Long, String, String>> patternStream = CEP.pattern(stream, pattern);
        patternStream.select((PatternSelectFunction<Tuple3<Long, String, String>, String>) map -> {
            System.out.println("trigger:" + map.values().size());
            return "trigger";
        }).print("warning");
        env.execute();
    }

Why does WatermarkStrategy fail to trigger the event immediately, while BoundedOutOfOrdernessTimestampExtractor works as expected? How can I achieve immediate triggering using WatermarkStrategy?


Solution

  • forBoundedOutOfOrderness use BoundedOutOfOrdernessWatermarks,onPeriodicEmit return new Watermark(maxTimestamp - outOfOrdernessMillis - 1). BoundedOutOfOrdernessTimestampExtractor,getCurrentWatermark return new Watermark(lastEmittedWatermark). because watermark - 1