javaapache-flinkamazon-kinesis

Incompatible Checkpoint and application versions with AWS Managed Flink


I have an AWS Managed Apache Flink application.

We have 2 input data streams and 2 output datastreams.

Each time I do bigger changes (e.g. we change the data formats on the input and output streams, and their Java Objects) the redeploy fails to resume processing from the latest checkpoint and we need to restart the app without a checkpoint.

We get errors like this:

Caused by: java.util.concurrent.CompletionException: java.lang.IllegalStateException: Failed to rollback to checkpoint/savepoint s3://90adba2e0a9002371abdf5ee58da7883e350fe4d/a4aa7fa6aabe9853f35082abc0fd1320-338785721659-1663851015791/savepoints/115/attempt-1/savepoint-a4aa7f-7f08cc16a519. Cannot map checkpoint/savepoint state for operator fb8f6c016d8c769a70a72e4189372e48 to the new program, because the operator is not available in the new program.

My question is: how can we work around this issue? We really would like to avoid a full restart, as not all our input streams are affected by this change, and those uneffected sent crucial data in the past which we would like to restore. Is there any way to map old checkpoints to the new state, as a migration? or at least save parts of the state which was unaffected?

To show in an example what I mean: Here, let's say we have two input Streams (Stream "A" and Stram "B") and there are two output streams (called Output "C" and Output "D"). I modify the Flink code and we change the dataformat of Stream A and Output C - but we don't change the Stream "B" and Output "D". Due to the breaking changes, we loose our whole state. Is there a way to map from the older checkpoint or at least save the "Stream "B" -> proc 4 -> proc 5 -> Output "D"" path's state?

my simplified apache flink processor architecture

I did investigate this issue, and found that the solution would be the State Processor API but I did not find any reference or usage guide how I can use it with AWS managed Flink.


Solution

  • By looking at the error, you don't specify the IDs for your operator and the current ID for your operator is generated by flink which is fb8f6c016d8c769a70a72e4189372e48.

    I suggest you to give a ID explicitly without letting flink to generate it like does now in your case.

    Here is a official doc for your reference: