javaapache-flinkrocksdb

why flink checkpoint size always growing with rocksdb


I have a Flink processing program that receives data from Kafka.

Then, after performing keyby, it sinks the data to Doris and Kafka.

Inside the keyby operation, I only use ValueState<T> to store states T is also an ordinary JavaBean. However, the size of the checkpoint keeps increasing.

Each time checkpoint processing occurs, it blocks the program's processing. Although I have set state.backend.async: true, it seems that when processing checkpoints, the program's reception of Kafka data seems to pause, resulting in the accumulation of Kafka data. When I don't use RocksDB and use HashMap instead, this problem disappears.

Then I conducted a test. I used a simple ValueState<Boolean>, continuously set it to true, and then observed the checkpoint. Its size also kept increasing.

Could you tell me why this is happening and how I should handle it?

checkpoint size image



    public static void main(String[] args) throws Exception {
        Configuration cfg = new Configuration();
        cfg.setString("s3.access-key", "xx");
        cfg.setString("s3.secret-key", "xxxx");
        cfg.setString("s3.endpoint", "http://xxx:xxx");
        cfg.setString("s3.path.style.access", "true");
        // use rocksdb the checkpoint size always growing
        cfg.set(StateBackendOptions.STATE_BACKEND, "rocksdb");
        // when use hashmap the checkpoint size fixed like 18.3kb
        // cfg.set(StateBackendOptions.STATE_BACKEND, "hashmap");

        FileSystem.initialize(cfg, null);
        StreamExecutionEnvironment env = StreamExecutionEnvironment
                .createLocalEnvironmentWithWebUI(cfg)
                .setParallelism(1);
        env.enableCheckpointing(1_000, CheckpointingMode.EXACTLY_ONCE);
        env.setStateBackend(new EmbeddedRocksDBStateBackend(true));
//        env.setStateBackend(new HashMapStateBackend());
//        env.getCheckpointConfig().setCheckpointStorage("file:///d:/.roy/.tmp/rocksdb/");
        env.getCheckpointConfig().setCheckpointStorage("s3://flink-dev/cpsdev/");
        env.configure(cfg);

        env.addSource(new RichSourceFunction<String>() {
            private static final long serialVersionUID = 1L;
            private volatile boolean isrunning;

            @Override
            public void run(SourceContext<String> ctx) throws Exception {
                isrunning = true;
                while (isrunning) {
                    ctx.collect("aaa " + LocalDateTime.now() + " " + System.currentTimeMillis());
                    TimeUnit.SECONDS.sleep(1);
                }
            }

            @Override
            public void cancel() {
                isrunning = false;
            }
        })
        .name("source")
        .map(value -> {
            String[] ss = value.split(" ");
            return new Tuple3<String, String, Long>(ss[0], ss[1], Long.parseLong(ss[2]));
        })
        .name("map as tuple2")
        .returns(Types.TUPLE(Types.STRING, Types.STRING, Types.LONG))
        //.assignTimestampsAndWatermarks(WatermarkStrategy
        //        .<Tuple3<String, String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(10))
        //        .withTimestampAssigner((e, t) -> e.f2))
        .keyBy(e -> e.f0)
        .process(new KeyedProcessFunction<String, Tuple3<String, String, Long>, Tuple3<String, String, Long>>() {
            private static final long serialVersionUID = 1L;
            private ValueState<Boolean> state;

            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                ValueStateDescriptor<Boolean> vd = new ValueStateDescriptor<Boolean>("value", Types.BOOLEAN);
                // the TTL config not work
//                StateTtlConfig ttl = StateTtlConfig.newBuilder(Time.ofSeconds(2))
//                        .setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp)
//                        .setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite)
//                        .cleanupInRocksdbCompactFilter(1000L)
//                        .cleanupIncrementally(10, true)
//                        .build();
//                vd.enableTimeToLive(ttl);
                state = getRuntimeContext().getState(vd);
            }
            

            @Override
            public void processElement(Tuple3<String, String, Long> value, Context ctx, Collector<Tuple3<String, String, Long>> out) throws Exception {
                for (int i = 0; i < 10000; i ++) {
                      // remove this the checkpoint size fixed.
                    state.update(true);
                }
                out.collect(value);
            }
        })
        .name("process keyby")
        .print()
        .name("print>>")
        ;
        env.execute("t4");
        
        
    }


I have tried to put the TimerService into the heap and configured the parameters of RocksDB.


Solution

  • Over time, checkpoints will increase in size for two main reasons:

    1. The volume of live data in the state backend is growing, most often because the number of active keys is growing.
    2. The volume of garbage in RocksDB is growing because it hasn't started compacting yet.

    RocksDB stores state in an LSM tree. Previous values remain in the tree until removed by a background process that does compaction. That doesn't start running until the tree gets kind of full.

    If you want more meaningful metrics, you can enable some of the RocksDB metrics, such as state.backend.rocksdb.metrics.estimate-live-data-size. However, doing so will negatively impact performance.