apache-flinkflink-streamingflink-ceppyflinkflink-batch

The configuration does not specify the checkpoint directory 'state.checkpoints.dir'


While submitting the flink job on the dataproc cluster getting the below error. Please find the code base and the error. I am using the flink 1.9.3 version.

 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: Could not retrieve the execution result. (JobID: f064ceaa5b318fdad9a77b2723b9ee64)
        at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:255)
        at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
        at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:60)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1489)
        at org.flink.ReadFromPubsub.main(ReadFromPubsub.java:28)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:604)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:466)
        at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
        at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
        at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
        at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1008)
        at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1081)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1926)
        at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1081)
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
        at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:391)
        at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:884)
        at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:866)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
        at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:263)
        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
        at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal server error., <Exception on server side:
org.apache.flink.runtime.client.JobSubmissionException: Failed to submit job.
        at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$internalSubmitJob$2(Dispatcher.java:333)
        at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)
        at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)
        at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager
        at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)
        at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
        ... 6 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager
        at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:152)
        at org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:83)
        at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:376)
        at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
        ... 7 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not instantiate configured state backend
        at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:303)
        at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:106)
        at org.apache.flink.runtime.scheduler.LegacyScheduler.createExecutionGraph(LegacyScheduler.java:207)
        at org.apache.flink.runtime.scheduler.LegacyScheduler.createAndRestoreExecutionGraph(LegacyScheduler.java:184)
        at org.apache.flink.runtime.scheduler.LegacyScheduler.<init>(LegacyScheduler.java:176)
        at org.apache.flink.runtime.scheduler.LegacySchedulerFactory.createInstance(LegacySchedulerFactory.java:70)
        at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:278)
        at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:266)
        at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
        at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
        at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:146)
        ... 10 more
Caused by: org.apache.flink.configuration.IllegalConfigurationException: Cannot create the RocksDB state backend: The configuration does not specify the checkpoint directory 'state.checkpoints.dir'
        at org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory.createFromConfig(RocksDBStateBackendFactory.java:44)
        at org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory.createFromConfig(RocksDBStateBackendFactory.java:32)
        at org.apache.flink.runtime.state.StateBackendLoader.loadStateBackendFromConfig(StateBackendLoader.java:154)
        at org.apache.flink.runtime.state.StateBackendLoader.fromApplicationOrConfigOrDefault(StateBackendLoader.java:219)
        at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:299)
        ... 20 more

End of exception on server side>]
        at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:389)
        at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:373)
        at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966)
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
        ... 4 more

Code snippet I executed

public class ReadFromPubsub
{

    public static void main(String args[]) throws Exception 
    {
        System.out.println("Flink Pubsub Code Read 1");
        
        
        
        StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment();
       
        
        
        
        System.out.println("Flink Pubsub Code Read 2");
        DeserializationSchema<String> deserializer = new SimpleStringSchema();
        System.out.println("Flink Pubsub Code Read 3");
        SourceFunction<String> pubsubSource = PubSubSource.newBuilder()
                                                            .withDeserializationSchema(deserializer)
.withProjectName("vz-it-np-gudv-dev-vzntdo-0")
                                                           .withSubscriptionName("subscription1")
                                                              .build();
        System.out.println("Flink Pubsub Code Read 4");
        streamExecEnv.addSource(pubsubSource).print();
        streamExecEnv.enableCheckpointing(10);
        System.out.println("Flink Pubsub Code Read 5");
        streamExecEnv.execute();
    }
}

I can see all the print statements during the execution of the code. After the last print statement I am getting the error.

All the exceptions to be resolved


Solution

  • Generally, you would supply the appropriate savepoint/checkpointing directory within your Flink configuration within flink-conf.yaml as detailed in the docs. If you aren't currently setting it, you can do so in the following ways:

    Via flink-conf.yaml (Preferred)

    state.checkpoints.dir: "file://example/checkpoints"
    

    Programmatically (within the job):

    Configuration configuration = new Configuration();
    conf.setString("state.checkpoints.dir", "file://example/checkpoints");
    
    StreamExecutionEnvironment streamExecEnv = 
        StreamExecutionEnvironment.getExecutionEnvironment(configuration);
    

    Via the CLI:

    flink run -Dstate.checkpoints.dir="/example/checkpoints" your-job.jar
    

    Additionally, if you didn't actually want to perform checkpointing, you could likely remove the following configuration within your job:

    streamExecEnv.enableCheckpointing(10);