Does the KafkaSink which implements TwoPhaseCommittingSink
supports the checkpointing implicitely or do I have to add any code for it?
I have a simple flink application which reads from source(KafkaSource). I want to handle the offset manually in kafka consumer so that if my app breaks down it should recover from checkpoint without a data loss or duplication.
Consider I am reading offsets from 1 --> 100 and if my flink application fails at Offset-87
Then using checkpoint, it should restart with Offset-87 not from earliest or latest.
As of now, it always starts from 1 if I have no delay in processing.
But if I add Thread.sleep(5000)
in .map()
then it works as expected and tries to read from 87.
As I read from this post - https://stackoverflow.com/a/70382650/4951838
Since I am using .print()
; it does not support exactly-once semantic. That's why it take 2-3 seconds to sync the checkpoint with kafka offsets.
So if I use KafkaSink with exactly-once guarantee, will it solve my issue?
Checkpoint config -
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000);
env.setParallelism(1);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.getCheckpointConfig().setCheckpointStorage("SomeS3Url");
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
1, // number of restart attempts
Time.of(1, TimeUnit.SECONDS) // delay
));
Kafka consumer -
KafkaSource<Long> dataSource = KafkaSource.<Long>builder()
.setBootstrapServers(brokers)
.setTopics("Alerts.Checkpoint")
.setGroupId("checkpoint-alertss-1")
.setProperties(prop) .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
.setClientIdPrefix("Alerts_01_temp_2")
.setDeserializer("custom deserializer")
.build();
As you can see below, I want to fail the application when consumer reaches to certain offset.
DataStreamSource<Long> alertStream = env.fromSource(dataSource , WatermarkStrategy.noWatermarks(), "Kafka Source");
alertStream
.map(new MapFunction<Long, String>() {
@Override
public String map(Long value) throws Exception {
// TODO Auto-generated method stub
if(value == 87) // value is an offset of a message
// throw new Exception();
return "Offset - "+ value.toString();
}
})
.print();
The KafkaSink natively supports Flink's snapshotting mechanism and no additional code is required to enable that. You shouldn't handle the offset management manually in your code.