apache-flink

Apache Flink: restoring state from checkpoint with changes Kafka topic


I faced with unexpected behavior when need start job from checkpoint and change Kafka topic. In this case Flink restore state for Kafka Consumer with early defined topic, last committed offset and consumer group id, as a result, Kafka Consumer starts consuming messages from two topics, the former one, which was restored from the state and the new one, defined in the configuration at the start of the job.

It's very confusing, and in the end, it's not entirely clear if it's a bug or a feature? Is there a way to manage recovery jobs from a checkpoint and at the same time not restore the state of Kafka consumers, but instead use the parameters from the configuration to initialize them? I need a previous job state, but I want to get new data from another topic!


Solution

  • If you change the UID of the KafkaSource (or FlinkKafkaConsumer) and restart the job with allowNonRestoredState enabled, then you'll get the behavior you are looking for.

    Changing the UID (or setting one, if you haven't explicitly set one) will prevent the saved Kafka offsets from being restored, and allowNonRestoredState will override Flink's built-in protections against losing state.