I am using flink cep sql with blink planner. Here is my sql
select * from test_table match_recognize (
partition by agent_id,room_id,call_type
order by row_time // process time
measures
last(BF.create_time) as create_time,
last(AF.connect_time) as connect_time
one row per match after match SKIP PAST LAST ROW
pattern (BF+ AF) WITHIN INTERVAL '10' SECOND
define
BF as BF.connect_time = 0,
AF as AF.connect_time > 0
) as T ;
The test_table is a kafka table
I set table.exec.state.ttl=10000 and run my program then I keep sending message.
As I both set state ttl and cep interval to 10s, the state's size should be a fixed number after 10 seconds when I started it.
But the fact is that the state keep growing for at least 15 minutes. Besides, jvm triggered twice full gc.
Are there any configurations I haven't configured?
You cannot use checkpoint sizes to estimate state size -- they are not related in any straightforward way. Checkpoints can include unpredictable amounts of in-flight, expired, or uncompacted data -- none of which would be counted as active state.
I'm afraid there isn't any good tooling available for measuring exactly how much state you actually have. But if you are using RocksDB, then you can enable these metrics
state.backend.rocksdb.metrics.estimate-live-data-size
state.backend.rocksdb.metrics.estimate-num-keys
which will give you a reasonably accurate estimate (but you may pay a performance penalty for turning them on).
As for your concern about CEP state -- you should be fine. Anytime you have a pattern that uses WITHIN
, CEP should be able to clean the state automatically.