I'm using flink 1.9 and the REST API /jobs/:jobid/savepoints
to trigger the savepoint and cancel job (stop the job gracefully to run later on from savepoint).
I use a two-phase commit in source function so my source implements both CheckpointedFunction
and CheckpointListener
interfaces. On snapshotState()
method call I snapshot the internal state and on notifyCheckpointComplete()
I checkpoint state to 3rd party system.
From what I can see from source code, only the snapshotState()
part is synchronous in CheckpointCoordinator
-
// send the messages to the tasks that trigger their checkpoint
for (Execution execution: executions) {
if (props.isSynchronous()) {
execution.triggerSynchronousSavepoint(checkpointID, timestamp, checkpointOptions, advanceToEndOfTime);
} else {
execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
}
}
The checkpoint acknowledge and completion notification is asynchronous in AsyncCheckpointRunnable
.
That being said, when the savepoint
with cancel-job
set to true
is triggered, after the snapshot is taken, some of the Task Managers keep up to receive completion notification before the job cancelling and execute notifyCheckpointComplete()
, and some not.
The question is whether there is a way to cancel job with savepoint so that the notifyCheckpointComplete()
is guaranteed to be invoked by all Task Managers before job cancelled or there is no way to achieve this at the moment ?
Wouldn't using stop-with-savepoint[1][2] solve the problem?
[1]https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jobs-jobid-stop [2]https://ci.apache.org/projects/flink/flink-docs-stable/ops/cli.html