apache-flinkflink-sqlflink-cep

how to auto generate watermark in flink sql table?


I am testing flink cep sql and my watermark is define as row time, my table is a kafka table. As watermark depends on minimal of all the kafka partitions, so every new message has to wait kafka partition align, then cep trigger results.

My kafka table(topic has 3 partitions) is defined as

create table test_table(
    agent_id String, room_id String, 
    create_time Bigint, 
    call_type String, 
    application_id String, 
    connect_time Bigint, 
    row_time as to_timestamp_ltz(create_time, 3), 
    WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
)

Here is my cep sql

select * from test_table  match_recognize (
   partition by agent_id,room_id,call_type 
   order by row_time
   measures  
       last(BF.create_time) as create_time, 
       last(AF.connect_time) as connect_time 
   one row per match after match SKIP PAST LAST ROW 
   pattern (BF+ AF) WITHIN INTERVAL '1' HOUR 
   define 
       BF as BF.connect_time = 0,
       AF as AF.connect_time > 0 and BF.room_id = AF.room_id and BF.call_type = AF.call_type 
) as T ;

The cep sql trigger result is right but is always late cause every partition needs align watermark. How can I get the newest result immediately or auto generate watermark in flink sql table?


Solution

  • Your pattern is asking to find a row with connect_time > 0 that is immediately after a row where connect_time = 0 (where both rows have the same room_id and call_type). To get this pattern matching done perfectly correctly, it's necessary to wait for the watermarks. Otherwise, a premature match might become invalidated by the arrival of an out-of-order event -- e.g., an event with connect_time < 0 right before AF. (You may know that's impossible, but the cep/sql engine can't know that.)

    If you are willing to relax the pattern matching semantics, why not replace this MATCH_RECOGNIZE query with an interval join (a self join with a temporal constraint). See https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/sql/queries/joins/#interval-joins for details.

    BTW, this part of the definition of AF

    ... and BF.room_id = AF.room_id and BF.call_type = AF.call_type
    

    isn't having any effect, since the stream is already partitioned by both room_id and call_type.