javaapache-flinkflink-streamingcheckpointing

Is there any way to ensure all CheckpointListeners notified about checkpoint completion on Flink on job cancel with savepoint?


I'm using flink 1.9 and the REST API /jobs/:jobid/savepoints to trigger the savepoint and cancel job (stop the job gracefully to run later on from savepoint).

I use a two-phase commit in source function so my source implements both CheckpointedFunction and CheckpointListener interfaces. On snapshotState() method call I snapshot the internal state and on notifyCheckpointComplete() I checkpoint state to 3rd party system.

From what I can see from source code, only the snapshotState() part is synchronous in CheckpointCoordinator -

// send the messages to the tasks that trigger their checkpoint
                for (Execution execution: executions) {
                    if (props.isSynchronous()) {
                        execution.triggerSynchronousSavepoint(checkpointID, timestamp, checkpointOptions, advanceToEndOfTime);
                    } else {
                        execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
                    }
                }

The checkpoint acknowledge and completion notification is asynchronous in AsyncCheckpointRunnable.

That being said, when the savepoint with cancel-job set to true is triggered, after the snapshot is taken, some of the Task Managers keep up to receive completion notification before the job cancelling and execute notifyCheckpointComplete(), and some not.

The question is whether there is a way to cancel job with savepoint so that the notifyCheckpointComplete() is guaranteed to be invoked by all Task Managers before job cancelled or there is no way to achieve this at the moment ?


Solution

  • Wouldn't using stop-with-savepoint[1][2] solve the problem?

    [1]https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jobs-jobid-stop [2]https://ci.apache.org/projects/flink/flink-docs-stable/ops/cli.html