apache-flinkflink-sqlflink-cep

When will flink clean up idle state in flink cep sql?


I am using flink cep sql with blink planner. Here is my sql

select * from test_table match_recognize (
   partition by agent_id,room_id,call_type 
   order by row_time // process time
   measures  
       last(BF.create_time) as create_time, 
       last(AF.connect_time) as connect_time 
   one row per match after match SKIP PAST LAST ROW 
   pattern (BF+ AF) WITHIN INTERVAL '10' SECOND 
   define 
       BF as BF.connect_time = 0,
       AF as AF.connect_time > 0  
) as T ;

The test_table is a kafka table

I set table.exec.state.ttl=10000 and run my program then I keep sending message.

As I both set state ttl and cep interval to 10s, the state's size should be a fixed number after 10 seconds when I started it.

But the fact is that the state keep growing for at least 15 minutes. Besides, jvm triggered twice full gc.

enter image description here

Are there any configurations I haven't configured?


Solution

  • You cannot use checkpoint sizes to estimate state size -- they are not related in any straightforward way. Checkpoints can include unpredictable amounts of in-flight, expired, or uncompacted data -- none of which would be counted as active state.

    I'm afraid there isn't any good tooling available for measuring exactly how much state you actually have. But if you are using RocksDB, then you can enable these metrics

    state.backend.rocksdb.metrics.estimate-live-data-size
    state.backend.rocksdb.metrics.estimate-num-keys
    

    which will give you a reasonably accurate estimate (but you may pay a performance penalty for turning them on).

    As for your concern about CEP state -- you should be fine. Anytime you have a pattern that uses WITHIN, CEP should be able to clean the state automatically.