apache-kafkaapache-flinkapache-iceberg

Data Loss in Flink Job with Iceberg Sink After Restart: How to Ensure Consistent Writes?


I am running a Flink job that reads data from Kafka, processes it into a Flink Row object, and writes it to an Iceberg Sink. To deploy new code changes, I restart the job from the latest savepoint, which is committed at the Kafka source. This savepoint stores the most recent Kafka offset read by the job, allowing the subsequent run to continue from the previous offset, ensuring transparent failure recovery. Recently, I encountered a scenario where data was lost in the final output after a job restart. Here's what happened:

  1. The job reads from Kafka starting at offset 1 and writes the data to an Iceberg table backed by S3.
  2. It then reads from offset 2, but the writes to the Iceberg table are delayed due to backpressure or network issues.
  3. The job proceeds to read from offset 3.
  4. Before the data from offset 2 is written to S3, I restart the job. After the restart, the job begins reading from offset 3, resulting in the loss of data from offset 2, which was never written to S3.

Is there a workaround for this problem, or is it an inherent limitation of the Iceberg Sink?


Solution

  • Leaving an update here for others who might run into this in the future as this was rather tricky to debug. We are currently using Iceberg 0.11.x on Flink 1.11, there is a bug that was fixed in #2745 to explicitly set operator UID in the Iceberg sink.

    If you don't explicitly set the UID and your job graph changes while deploying a change with a checkpoint, Flink won't be able to restore the previous Flink sink operator state, specifically the committer operator state. However, you can circumvent this issue by using the --allowNonRestoredState flag. During the restore process, Flink uses the sink state to verify whether checkpointed files were actually committed. Using --allowNonRestoredState can lead to data loss because the Iceberg commit might have failed in the last completed checkpoint.