I have a Flink processing program that receives data from Kafka.
Then, after performing keyby
, it sinks the data to Doris
and Kafka
.
Inside the keyby
operation, I only use ValueState<T>
to store states T is also an ordinary JavaBean
. However, the size of the checkpoint keeps increasing.
Each time checkpoint processing occurs, it blocks the program's processing.
Although I have set state.backend.async: true
, it seems that when processing checkpoints, the program's reception of Kafka data seems to pause, resulting in the accumulation of Kafka data.
When I don't use RocksDB and use HashMap
instead, this problem disappears.
Then I conducted a test. I used a simple ValueState<Boolean>
, continuously set it to true, and then observed the checkpoint. Its size also kept increasing.
Could you tell me why this is happening and how I should handle it?
public static void main(String[] args) throws Exception {
Configuration cfg = new Configuration();
cfg.setString("s3.access-key", "xx");
cfg.setString("s3.secret-key", "xxxx");
cfg.setString("s3.endpoint", "http://xxx:xxx");
cfg.setString("s3.path.style.access", "true");
// use rocksdb the checkpoint size always growing
cfg.set(StateBackendOptions.STATE_BACKEND, "rocksdb");
// when use hashmap the checkpoint size fixed like 18.3kb
// cfg.set(StateBackendOptions.STATE_BACKEND, "hashmap");
FileSystem.initialize(cfg, null);
StreamExecutionEnvironment env = StreamExecutionEnvironment
.createLocalEnvironmentWithWebUI(cfg)
.setParallelism(1);
env.enableCheckpointing(1_000, CheckpointingMode.EXACTLY_ONCE);
env.setStateBackend(new EmbeddedRocksDBStateBackend(true));
// env.setStateBackend(new HashMapStateBackend());
// env.getCheckpointConfig().setCheckpointStorage("file:///d:/.roy/.tmp/rocksdb/");
env.getCheckpointConfig().setCheckpointStorage("s3://flink-dev/cpsdev/");
env.configure(cfg);
env.addSource(new RichSourceFunction<String>() {
private static final long serialVersionUID = 1L;
private volatile boolean isrunning;
@Override
public void run(SourceContext<String> ctx) throws Exception {
isrunning = true;
while (isrunning) {
ctx.collect("aaa " + LocalDateTime.now() + " " + System.currentTimeMillis());
TimeUnit.SECONDS.sleep(1);
}
}
@Override
public void cancel() {
isrunning = false;
}
})
.name("source")
.map(value -> {
String[] ss = value.split(" ");
return new Tuple3<String, String, Long>(ss[0], ss[1], Long.parseLong(ss[2]));
})
.name("map as tuple2")
.returns(Types.TUPLE(Types.STRING, Types.STRING, Types.LONG))
//.assignTimestampsAndWatermarks(WatermarkStrategy
// .<Tuple3<String, String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(10))
// .withTimestampAssigner((e, t) -> e.f2))
.keyBy(e -> e.f0)
.process(new KeyedProcessFunction<String, Tuple3<String, String, Long>, Tuple3<String, String, Long>>() {
private static final long serialVersionUID = 1L;
private ValueState<Boolean> state;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
ValueStateDescriptor<Boolean> vd = new ValueStateDescriptor<Boolean>("value", Types.BOOLEAN);
// the TTL config not work
// StateTtlConfig ttl = StateTtlConfig.newBuilder(Time.ofSeconds(2))
// .setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp)
// .setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite)
// .cleanupInRocksdbCompactFilter(1000L)
// .cleanupIncrementally(10, true)
// .build();
// vd.enableTimeToLive(ttl);
state = getRuntimeContext().getState(vd);
}
@Override
public void processElement(Tuple3<String, String, Long> value, Context ctx, Collector<Tuple3<String, String, Long>> out) throws Exception {
for (int i = 0; i < 10000; i ++) {
// remove this the checkpoint size fixed.
state.update(true);
}
out.collect(value);
}
})
.name("process keyby")
.print()
.name("print>>")
;
env.execute("t4");
}
I have tried to put the TimerService into the heap and configured the parameters of RocksDB.
Over time, checkpoints will increase in size for two main reasons:
RocksDB stores state in an LSM tree. Previous values remain in the tree until removed by a background process that does compaction. That doesn't start running until the tree gets kind of full.
If you want more meaningful metrics, you can enable some of the RocksDB metrics, such as state.backend.rocksdb.metrics.estimate-live-data-size
. However, doing so will negatively impact performance.