Does the KafkaSink which implements TwoPhaseCommittingSink
supports the checkpointing implicitely or do I have to add any code for it?
I have a simple flink application which reads from source(KafkaSource). I want to handle the offset manually in kafka consumer so that if my app breaks down it should recover from checkpoint without a data loss or duplication.
Consider I am reading offsets from 1 --> 100 and if my flink application fails at Offset-87
Then using checkpoint, it should restart with Offset-87 not from earliest or latest.
As of now, it always starts from 1 if I have no delay in processing.
But if I add Thread.sleep(5000)
in .map()
then it works as expected and tries to read from 87.
As I read from this post -
Since I am using .print()
; it does not support exactly-once semantic. That's why it take 2-3 seconds to sync the checkpoint with kafka offsets.
So if I use KafkaSink with exactly-once guarantee, will it solve my issue?
Checkpoint config -
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
1, // number of restart attempts
Time.of(1, TimeUnit.SECONDS) // delay
Kafka consumer -
KafkaSource<Long> dataSource = KafkaSource.<Long>builder()
.setProperties(prop) .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
.setDeserializer("custom deserializer")
As you can see below, I want to fail the application when consumer reaches to certain offset.
DataStreamSource<Long> alertStream = env.fromSource(dataSource , WatermarkStrategy.noWatermarks(), "Kafka Source");
.map(new MapFunction<Long, String>() {
public String map(Long value) throws Exception {
// TODO Auto-generated method stub
if(value == 87) // value is an offset of a message
// throw new Exception();
return "Offset - "+ value.toString();
The KafkaSink natively supports Flink's snapshotting mechanism and no additional code is required to enable that. You shouldn't handle the offset management manually in your code.