I am trying to setup Apache Zeppelin and Apache Flink using docker-compose.
I can get Zeppelin to work well with Flink in local mode, when Zeppelin spawns a local Flink cluster, but not in remote mode when Zeppelin connects to an existing cluster.
I settled on using Flink 1.12.0 with Zeppelin as this is the only version I can get working in local mode.
I use the following docker-compose.yml, which is largely inspired by the Flink documentation: https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/standalone/docker/
version: "2.2"
services:
jobmanager:
image: flink:1.12.0
ports:
- "8081:8081"
command: jobmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager:
image: flink:1.12.0
depends_on:
- jobmanager
command: taskmanager
scale: 1
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 2
zeppelin:
image: apache/zeppelin:0.10.1
hostname: zeppelin
depends_on:
- taskmanager
container_name: zeppelin
ports:
- "9080:8080"
- "9081:8081"
volumes:
- /home/administrator/flink/flink-1.12.0:/opt/flink
environment:
FLINK_HOME: /opt/flink
I use the Zeppelin UI on port 9081 to configure the remote Flink cluster. The parameters are:
flink.execution.mode: remote
flink.execution.remote.host: jobmanager
flink.execution.remote.port: 8081
I get this error when executing the "Flinks Basic" notebook Batch WordCount cell.
data: org.apache.flink.api.scala.DataSet[String] = org.apache.flink.api.scala.DataSet@6adbed6a
org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: c55932eec9cc32b8df64630be00b2532)
at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:119 undefined)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616 undefined)
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591 undefined)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488 undefined)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975 undefined)
at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$22(RestClusterClient.java:602 undefined)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774 undefined)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750 undefined)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488 undefined)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975 undefined)
at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:379 undefined)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774 undefined)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750 undefined)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488 undefined)
at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575 undefined)
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943 undefined)
at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456 undefined)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149 undefined)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624 undefined)
at java.lang.Thread.run(Thread.java:748 undefined)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147 undefined)
at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:117 undefined)
... 19 more
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116 undefined)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78 undefined)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224 undefined)
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217 undefined)
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208 undefined)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610 undefined)
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89 undefined)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419 undefined)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62 undefined)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43 undefined)
at java.lang.reflect.Method.invoke(Method.java:498 undefined)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286 undefined)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201 undefined)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74 undefined)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154 undefined)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26 undefined)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21 undefined)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123 undefined)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122 undefined)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21 undefined)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171 undefined)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172 undefined)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172 undefined)
at akka.actor.Actor.aroundReceive(Actor.scala:517 undefined)
at akka.actor.Actor.aroundReceive$(Actor.scala:515 undefined)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225 undefined)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592 undefined)
at akka.actor.ActorCell.invoke(ActorCell.scala:561 undefined)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258 undefined)
at akka.dispatch.Mailbox.run(Mailbox.scala:225 undefined)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235 undefined)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260 undefined)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339 undefined)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979 undefined)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107 undefined)
Caused by: java.lang.RuntimeException: The initialization of the DataSource's outputs caused an error: Could not read the user code wrapper: org.apache.flink.api.scala.typeutils.CaseClassTypeInfo; local class incompatible: stream classdesc serialVersionUID = -3156538608383968041, local class serialVersionUID = -8170136754691974512
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:110 undefined)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722 undefined)
at org.apache.flinktime.taskmanager.Task.run.run(Task.java:547 undefined)
at java.lang.Thread.run(Thread.java:748 undefined)
Caused by: org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could not read the user code wrapper: org.apache.flink.api.scala.typeutils.CaseClassTypeInfo; local class incompatible: stream classdesc serialVersionUID = -3156538608383968041, local class serialVersionUID = -8170136754691974512
at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:290 undefined)
at org.apache.flink.runtime.operators.BatchTask.instantiateUserCode(BatchTask.java:1464 undefined)
at org.apache.flink.runtime.operators.chaining.SynchronousChainedCombineDriver.setup(SynchronousChainedCombineDriver.java:90 undefined)
at org.apache.flink.runtime.operators.chaining.ChainedDriver.setup(ChainedDriver.java:91 undefined)
at org.apache.flink.runtime.operators.BatchTask.initOutputs(BatchTask.java:1333 undefined)
at org.apache.flink.runtime.operators.DataSourceTask.initOutputs(DataSourceTask.java:317 undefined)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:107 undefined)
... 3 more
Caused by: java.io.InvalidClassException: org.apache.flink.api.scala.typeutils.CaseClassTypeInfo; local class incompatible: stream classdesc serialVersionUID = -3156538608383968041, local class serialVersionUID = -8170136754691974512
at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699 undefined)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:2003 undefined)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1850 undefined)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:2003 undefined)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1850 undefined)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2160 undefined)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667 undefined)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405 undefined)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329 undefined)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187 undefined)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667 undefined)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405 undefined)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329 undefined)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187 undefined)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667 undefined)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503 undefined)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461 undefined)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576 undefined)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562 undefined)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550 undefined)
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511 undefined)
at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288 undefined)
... 9 more
Looked inside Flink's log file for other ports to try instead of 8081. None worked.
Tried other versions of Fink but got different errors and could not even get Zeppelin to work in local mode.
Any suggestions?
Many thanks, Michel
I managed to get Apache Zeppelin to use a remote instance of Apache Flink by explicitly mentioning the version of Scala (2.11) in the image to pull down.
flink:1.14.6-scala_2.11-java11
Also, I upgraded to a more recent version of Apache Flink (1.14.6). I understand 1.14.6 is the last version of Flink which ships with Scala 2.11.
Here is the docker-compose.yml that works for me:
version: "2.2"
services:
jobmanager:
image: flink:1.14.6-scala_2.11-java11
ports:
- "8081:8081"
command: jobmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager:
image: flink:1.14.6-scala_2.11-java11
depends_on:
- jobmanager
command: taskmanager
scale: 1
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 2
zeppelin:
image: apache/zeppelin:0.10.1
hostname: zeppelin
depends_on:
- taskmanager
container_name: zeppelin
ports:
- "9080:8080"
- "9081:8081"
volumes:
- /home/administrator/flink/flink-1.14.6:/opt/flink
environment:
FLINK_HOME: /opt/flink
Apache Zeppelin is accessible on HTTP//<server_ip>:9081 and is configured with these settings:
flink.execution.mode remote
flink.execution.remote.host jobmanager
flink.execution.remote.port 8081