apache-flinkflink-streamingrocksdbrocksdb-java

Limiting the state size in flink


I have a flink job with RMQ source, filters, window (Tumbling window fires every 2seconds and uses processing time), aggregate function, sink. RockDB state backend with incremental checkpoints is enabled (Every 10s checkpointing is happening with 5s pause between checkpoints.). After 5 days, checkpoints size is 196MB and state size or delta data is 191 MB.

With this shall i assume my state is growing resulting in increased checkpoints? If yes, how can i control my state size. I read that State size is limited to available disk space. Does it mean if i reduce the taskmanager pod memory (or set same as my flink process memory), state size will be in control??

I am struggling with this increasing state size. Any help would really be appreciated.

enter image description here

Following is checkpoints size for a week,

enter image description here

PRD Environment: Checkpoints are stored in EFS. Task manager (3 taskmanagers with 5 slots each. Each operator except source (parallelism of 1) is having parallelism of 15) & Jobmanager is running in Kubernetes (clustered environment).

Below is the alignment duration and start delay after job was ran for 1 day.
enter image description here


Solution

  • After all of the Q&A above, my best guess is that your state size is so small that RocksDB compaction isn't getting triggered. Assuming your TM heap size is set to a reasonable value, you should be able to test using the in-memory hash map state backend.