apache-flinkflink-sqlflink-cep

flink cep sql Event Not triggering


I use CEP Pattern in Flink SQL which is working as expected connecting to Kafka broker. But when i connecting to cluster based cloud kafka setup, the Flink CEP is not triggering. Here is my sql:

create table agent_action_detail 
(
    agent_id String, 
    room_id String, 
    create_time Bigint, 
    call_type String, 
    application_id String, 
    connect_time Bigint, 
    row_time TIMESTAMP_LTZ(3), WATERMARK for row_time as row_time  - INTERVAL '1' MINUTE) 
with ('connector'='kafka', 'topic'='agent-action-detail', ...)

then I send messages in json format like

{"agent_id":"agent_221","room_id":"room1","create_time":1635206828877,"call_type":"inbound","application_id":"app1","connect_time":1635206501735,"row_time":"2021-10-25 16:07:09.019Z"}

in flink web ui, watermark works fine flink web ui

I run my cep sql :

select * from agent_action_detail
 match_recognize(
    partition by agent_id 
    order by row_time 
    measures 
        last(BF.create_time) as create_time, 
        first(AF.connect_time) as connect_time 
    one row per match AFTER MATCH SKIP PAST LAST ROW 
    pattern (BF+ AF) define BF as BF.connect_time > 0 ,AF as AF.connect_time > 0
 )

every kafka message, connect_time is > 0, but flink not triggering. Can somebody help to this issue, thanks in advance!


select * from agent_action_detail match_recognize( partition by agent_id order by row_time  measures AF.connect_time as connect_time one row per match pattern (BF AF) WITHIN INTERVAL '1' second define BF as (last(BF.connect_time, 1) < 1), AF as AF.connect_time >= 100)

Here is another cep sql still not working. And the agent_action_detail table is insert by another flink sql as

insert into agent_action_detail select data.agent_id, data.room_id, data.create_time, data.call_type, data.application_id, data.connect_time, now() from source_table where type = 'xxx'

Solution

  • There are several things that can cause pattern matching to produce no results:

    This particular pattern loops with no exit condition. This sort of pattern doesn't allow the internal state of the pattern matching engine to ever be cleared, which will lead to problems.

    If you were using Flink CEP directly, I would tell you to try adding until(condition) or within(time) to constrain the number of possible matches.

    With MATCH_RECOGNIZE, see if you can add a distinct terminating element to the pattern.


    Update: since you are still getting no results after modifying the pattern, you should determine if watermarking is the source of your problem. CEP relies on sorting the input stream by time, which depends on watermarking -- but only if you are using event time.

    The easiest way to test this would be to switch to using processing time:

    create table agent_action_detail 
    (
        agent_id String, 
        ...
        row_time AS PROCTIME()
    )
    with (...)
    

    If that works, then either the timestamps or watermarks are the problem. For example, if all of the events are late, you'll get no results. In your case, I'm wondering the row_time column has any data in it.


    If that doesn't reveal the problem, please share a minimal reproducible example, including the data needed to observe the problem.