apache-flinkkafka-consumer-apioffsetfailovercheckpoint

How does flink handles kafka offsets in checkpointing when app fails intermediately?


Does the KafkaSink which implements TwoPhaseCommittingSink supports the checkpointing implicitely or do I have to add any code for it?

I have a simple flink application which reads from source(KafkaSource). I want to handle the offset manually in kafka consumer so that if my app breaks down it should recover from checkpoint without a data loss or duplication.

Consider I am reading offsets from 1 --> 100 and if my flink application fails at Offset-87 Then using checkpoint, it should restart with Offset-87 not from earliest or latest. As of now, it always starts from 1 if I have no delay in processing. But if I add Thread.sleep(5000) in .map() then it works as expected and tries to read from 87.

As I read from this post - https://stackoverflow.com/a/70382650/4951838

Since I am using .print() ; it does not support exactly-once semantic. That's why it take 2-3 seconds to sync the checkpoint with kafka offsets.

So if I use KafkaSink with exactly-once guarantee, will it solve my issue?

Checkpoint config -

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(1000);
        env.setParallelism(1);      
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        env.getCheckpointConfig().setCheckpointStorage("SomeS3Url");
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
                  1, // number of restart attempts
                  Time.of(1, TimeUnit.SECONDS) // delay
                ));

Kafka consumer -

KafkaSource<Long> dataSource = KafkaSource.<Long>builder()
                .setBootstrapServers(brokers)
                .setTopics("Alerts.Checkpoint")
                .setGroupId("checkpoint-alertss-1")
                .setProperties(prop)                                   .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
                .setClientIdPrefix("Alerts_01_temp_2")              
                .setDeserializer("custom deserializer")
                .build();

As you can see below, I want to fail the application when consumer reaches to certain offset.

DataStreamSource<Long> alertStream = env.fromSource(dataSource , WatermarkStrategy.noWatermarks(), "Kafka Source");

    alertStream 
    .map(new MapFunction<Long, String>() {
        @Override
        public String map(Long value) throws Exception {
            // TODO Auto-generated method stub
            if(value == 87) // value is an offset of a message
//              throw new Exception();
            return "Offset - "+ value.toString();
        }
    })
    .print();

Solution

  • The KafkaSink natively supports Flink's snapshotting mechanism and no additional code is required to enable that. You shouldn't handle the offset management manually in your code.