I am testing flink cep sql and my watermark is define as row time, my table is a kafka table. As watermark depends on minimal of all the kafka partitions, so every new message has to wait kafka partition align, then cep trigger results.
My kafka table(topic has 3 partitions) is defined as
create table test_table(
agent_id String, room_id String,
create_time Bigint,
call_type String,
application_id String,
connect_time Bigint,
row_time as to_timestamp_ltz(create_time, 3),
WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
)
Here is my cep sql
select * from test_table match_recognize (
partition by agent_id,room_id,call_type
order by row_time
measures
last(BF.create_time) as create_time,
last(AF.connect_time) as connect_time
one row per match after match SKIP PAST LAST ROW
pattern (BF+ AF) WITHIN INTERVAL '1' HOUR
define
BF as BF.connect_time = 0,
AF as AF.connect_time > 0 and BF.room_id = AF.room_id and BF.call_type = AF.call_type
) as T ;
The cep sql trigger result is right but is always late cause every partition needs align watermark. How can I get the newest result immediately or auto generate watermark in flink sql table?
Your pattern is asking to find a row with connect_time > 0
that is immediately after a row where connect_time = 0
(where both rows have the same room_id and call_type). To get this pattern matching done perfectly correctly, it's necessary to wait for the watermarks. Otherwise, a premature match might become invalidated by the arrival of an out-of-order event -- e.g., an event with connect_time < 0
right before AF. (You may know that's impossible, but the cep/sql engine can't know that.)
If you are willing to relax the pattern matching semantics, why not replace this MATCH_RECOGNIZE query with an interval join (a self join with a temporal constraint). See https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/sql/queries/joins/#interval-joins for details.
BTW, this part of the definition of AF
... and BF.room_id = AF.room_id and BF.call_type = AF.call_type
isn't having any effect, since the stream is already partitioned by both room_id
and call_type
.