I have 2 questions related to high availability of a StateFun application running on Kubernetes
Here are details about my setup:
1- I tried both Zookeeper and Kubernetes HA settings, result is the same (log below is from a Zookeeper HA env). When I kill the jobmanager pod, minikube starts another pod and this new pod fails when it tries to load last checkpoint:
...
2021-12-11 14:25:26,426 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Initializing job myStatefunApp (00000000000000000000000000000000).
2021-12-11 14:25:26,443 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Using restart back off time strategy FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2147483647, backoffTimeMS=1000) for myStatefunApp (00000000000000000000000000000000).
2021-12-11 14:25:26,516 INFO org.apache.flink.runtime.util.ZooKeeperUtils [] - Initialized DefaultCompletedCheckpointStore in 'ZooKeeperStateHandleStore{namespace='statefun_zk_recovery/my-statefun-app/checkpoints/00000000000000000000000000000000'}' with /checkpoints/00000000000000000000000000000000.
2021-12-11 14:25:26,599 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Running initialization on master for job myStatefunApp (00000000000000000000000000000000).
2021-12-11 14:25:26,599 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Successfully ran initialization on master in 0 ms.
2021-12-11 14:25:26,617 INFO org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology [] - Built 1 pipelined regions in 1 ms
2021-12-11 14:25:26,626 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Using job/cluster config to configure application-defined state backend: EmbeddedRocksDBStateBackend{, localRocksDbDirectories=null, enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=1, writeBatchSize=2097152}
2021-12-11 14:25:26,627 INFO org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - Using predefined options: DEFAULT.
2021-12-11 14:25:26,627 INFO org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - Using application-defined options factory: DefaultConfigurableOptionsFactory{configuredOptions={state.backend.rocksdb.thread.num=1}}.
2021-12-11 14:25:26,627 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Using application-defined state backend: EmbeddedRocksDBStateBackend{, localRocksDbDirectories=null, enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=1, writeBatchSize=2097152}
2021-12-11 14:25:26,631 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Checkpoint storage is set to 'filesystem': (checkpoints "hdfs://hdfs-namenode:8020/tmp/statefun_checkpoints/myStatefunApp")
2021-12-11 14:25:26,712 INFO org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - Recovering checkpoints from ZooKeeperStateHandleStore{namespace='statefun_zk_recovery/my-statefun-app/checkpoints/00000000000000000000000000000000'}.
2021-12-11 14:25:26,724 INFO org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - Found 1 checkpoints in ZooKeeperStateHandleStore{namespace='statefun_zk_recovery/my-statefun-app/checkpoints/00000000000000000000000000000000'}.
2021-12-11 14:25:26,725 INFO org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - Trying to fetch 1 checkpoints from storage.
2021-12-11 14:25:26,725 INFO org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - Trying to retrieve checkpoint 2.
2021-12-11 14:25:26,931 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Restoring job 00000000000000000000000000000000 from Checkpoint 2 @ 1639232587220 for 00000000000000000000000000000000 located at hdfs://hdfs-namenode:8020/tmp/statefun_checkpoints/myStatefunApp/00000000000000000000000000000000/chk-2.
2021-12-11 14:25:27,012 ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Fatal error occurred in the cluster entrypoint.
org.apache.flink.util.FlinkException: JobMaster for job 00000000000000000000000000000000 failed.
at org.apache.flink.runtime.dispatcher.Dispatcher.jobMasterFailed(Dispatcher.java:873) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.dispatcher.Dispatcher.jobManagerRunnerFailed(Dispatcher.java:459) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.dispatcher.Dispatcher.handleJobManagerRunnerResult(Dispatcher.java:436) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$runJob$3(Dispatcher.java:415) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
at java.util.concurrent.CompletableFuture.uniHandle(Unknown Source) ~[?:?]
at java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown Source) ~[?:?]
at java.util.concurrent.CompletableFuture$Completion.run(Unknown Source) ~[?:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.12-1.13.2.jar:1.13.2]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.12-1.13.2.jar:1.13.2]
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) [flink-dist_2.12-1.13.2.jar:1.13.2]
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) [flink-dist_2.12-1.13.2.jar:1.13.2]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.12-1.13.2.jar:1.13.2]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.12-1.13.2.jar:1.13.2]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-dist_2.12-1.13.2.jar:1.13.2]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-dist_2.12-1.13.2.jar:1.13.2]
at akka.actor.Actor.aroundReceive(Actor.scala:517) [flink-dist_2.12-1.13.2.jar:1.13.2]
at akka.actor.Actor.aroundReceive$(Actor.scala:515) [flink-dist_2.12-1.13.2.jar:1.13.2]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.12-1.13.2.jar:1.13.2]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.12-1.13.2.jar:1.13.2]
at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.12-1.13.2.jar:1.13.2]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.12-1.13.2.jar:1.13.2]
at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.12-1.13.2.jar:1.13.2]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.12-1.13.2.jar:1.13.2]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.12-1.13.2.jar:1.13.2]
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.12-1.13.2.jar:1.13.2]
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.12-1.13.2.jar:1.13.2]
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.12-1.13.2.jar:1.13.2]
Caused by: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster.
at org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
at java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source) ~[?:?]
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source) ~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(Unknown Source) ~[?:?]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) ~[?:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[?:?]
at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?]
at java.lang.Thread.run(Unknown Source) ~[?:?]
Caused by: java.util.concurrent.CompletionException: java.lang.IllegalStateException: There is no operator for the state 2edd7b5dafb2c271440b25f6da5f4532
at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source) ~[?:?]
at java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source) ~[?:?]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) ~[?:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[?:?]
at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?]
at java.lang.Thread.run(Unknown Source) ~[?:?]
Caused by: java.lang.IllegalStateException: There is no operator for the state 2edd7b5dafb2c271440b25f6da5f4532
at org.apache.flink.runtime.checkpoint.StateAssignmentOperation.checkStateMappingCompleteness(StateAssignmentOperation.java:712) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignStates(StateAssignmentOperation.java:100) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1562) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreInitialCheckpointIfPresent(CheckpointCoordinator.java:1476) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:134) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:342) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:190) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:122) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:132) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:110) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:340) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:317) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:107) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) ~[?:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[?:?]
at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?]
at java.lang.Thread.run(Unknown Source) ~[?:?]
2021-12-11 14:25:27,017 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Shutting StatefulFunctionsClusterEntryPoint down with application status UNKNOWN. Diagnostics Cluster entrypoint has been closed externally..
2021-12-11 14:25:27,021 INFO org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shutting down rest endpoint.
2021-12-11 14:25:27,025 INFO org.apache.flink.runtime.blob.BlobServer [] - Stopped BLOB server at 0.0.0.0:6124
2021-12-11 14:25:27,034 INFO org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Removing cache directory /tmp/flink-web-6c2dafc9-bb7d-489a-9e2d-cf78e3f19b67/flink-web-ui
2021-12-11 14:25:27,035 INFO org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - Stopping DefaultLeaderElectionService.
2021-12-11 14:25:27,035 INFO org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver [] - Closing ZooKeeperLeaderElectionDriver{leaderPath='/leader/rest_server_lock'}
2021-12-11 14:25:27,036 INFO org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shut down complete.
2021-12-11 14:25:27,036 INFO org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent [] - Closing components.
2021-12-11 14:25:27,037 INFO org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - Stopping DefaultLeaderRetrievalService.
2021-12-11 14:25:27,037 INFO org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalDriver [] - Closing ZookeeperLeaderRetrievalDriver{retrievalPath='/leader/dispatcher_lock'}.
2021-12-11 14:25:27,037 INFO org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - Stopping DefaultLeaderRetrievalService.
2021-12-11 14:25:27,037 INFO org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalDriver [] - Closing ZookeeperLeaderRetrievalDriver{retrievalPath='/leader/resource_manager_lock'}.
2021-12-11 14:25:27,038 INFO org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - Stopping DefaultLeaderElectionService.
2021-12-11 14:25:27,038 INFO org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver [] - Closing ZooKeeperLeaderElectionDriver{leaderPath='/leader/dispatcher_lock'}
2021-12-11 14:25:27,039 INFO org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcess [] - Stopping JobDispatcherLeaderProcess.
2021-12-11 14:25:27,040 INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Closing the slot manager.
2021-12-11 14:25:27,040 INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Suspending the slot manager.
2021-12-11 14:25:27,041 INFO org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - Stopping DefaultLeaderElectionService.
2021-12-11 14:25:27,041 INFO org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver [] - Closing ZooKeeperLeaderElectionDriver{leaderPath='/leader/resource_manager_lock'}
I believe not being able to specify ids for Flink operators (as told here) when using StateFun is causing this. While it was working fine in the beginning, operators got some random id assigned and checkpointing went just fine. After the restart, the operators are assigned other random ids, and when the jobmanager (statefun master in this case) tries to load the state "2edd7b5dafb2c271440b25f6da5f4532" it fails to find the operator assigned to it originally.
Can someone confirm what I think is correct and / or give me directions for making my StateFun app work with high availability?
Another interesting thing to note is, after several restarts of the jobmanager pod with the above exception, it sometimes can get past the "Restoring job 00000000000000000000000000000000 from Checkpoint ..." line somehow (?), with "No master state to restore" log (link) - which makes me feel not sure about it really did recover or it just started discarding the state on last successful checkpoint. What might be causing this? Is it really recovering from the checkpoint successfully?
2- For Kubernetes deployments, on StateFun deployment documentation (link) Deployment type is used for jobmanager component. On the other hand Flink deployment documentation (Standalone / Kubernetes section) (link) uses Job type for jobmanager for high available setup (jobmanager-application-ha.yaml file)
Basically since Kubernetes will restart the pod on failures, either Job or Deployment can be used. But the thing is, when we try to stop the job with a savepoint and Deployment type is used, Kubernetes restarts the pod regardless of successful savepoint creation and success exit status (0).
Are we supposed not to stop StateFun apps with savepoint when running on Kubernetes? I am aware of the related bug (link) - but although it seems to be deprecated I can do a cancel with savepoint - are we supposed to just delete deployment as told in High availability data clean up section? (link)
UPDATE for the first question: I turned on debug logging and could capture a session with the exception and a successful startup in a row. The following is from the unsuccessful one:
...
2021-12-11 21:55:14,001 DEBUG org.apache.flink.streaming.api.graph.StreamGraphHasherV2 [] - Generated hash '32d5ca33c915e65563a5c7f4d62703ad' for node 'router (my-ingress-1-in)-5' {id: 5, parallelism: 1, user function: }
2021-12-11 21:55:14,001 DEBUG org.apache.flink.streaming.api.graph.StreamGraphHasherV2 [] - Generated hash '33b86fe798648d648b237ddfc986200d' for node 'router (my-ingress-2-in)-4' {id: 4, parallelism: 1, user function: }
2021-12-11 21:55:14,001 DEBUG org.apache.flink.streaming.api.graph.StreamGraphHasherV2 [] - Generated hash 'bd4c3fa1570bbcf606f2dabddd61ed7f' for node 'router (my-ingress-3-in)-6' {id: 6, parallelism: 1, user function: }
and this is from the successful one:
2021-12-11 21:55:34,543 DEBUG org.apache.flink.streaming.api.graph.StreamGraphHasherV2 [] - Generated hash 'a1448ecf31ac98d2215c38bfd119abe0' for node 'router (my-ingress-3-in)-5' {id: 5, parallelism: 1, user function: }
2021-12-11 21:55:34,543 DEBUG org.apache.flink.streaming.api.graph.StreamGraphHasherV2 [] - Generated hash '05037ff96baea131d9cf1390846efd98' for node 'router (my-ingress-1-in)-4' {id: 4, parallelism: 1, user function: }
2021-12-11 21:55:34,543 DEBUG org.apache.flink.streaming.api.graph.StreamGraphHasherV2 [] - Generated hash '2edd7b5dafb2c271440b25f6da5f4532' for node 'router (my-ingress-2-in)-6' {id: 6, parallelism: 1, user function: }
It seems that generated hashes between two runs are computed differently.
In statefun <= 3.2 routers do not have manually specified UIDs. While Flinks internal UID generation is deterministic, the way statefun generates the underlying stream graph may not be in some cases. This is a bug. I've opened a PR to fix this in a backwards compatible way[1].