apache-sparkpysparkapache-spark-sqlapache-zookeeper

Spark Executor Fails to Connect to Driver in Cluster Standalone mode: "Connection refused: hostname/ip:randomport"


I'm setting up a Spark cluster (standalone mode) with ZooKeeper for high availability. I have 2 master nodes (s1, s2) and 3 worker nodes (s3, s4, s5). When I try to run a Spark job (even a simple spark-shell command), I get the following error in the executor logs (Failed to connect to client-host/client-ip:random-port):

java.io.IOException: Failed to connect to s1/10.1.1.21:45407
Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: s1/10.1.1.21:45407
Caused by: java.net.ConnectException: Connection refused

It seems that the executor on the worker node is unable to establish a connection with the driver on the master node (s1) at port 45407.
All nodes can communicate with each other, I have no firewall, opening a port in any node can be reached from any other node.

My Configuration:

Bash

JAVA_HOME=/usr/lib/jvm/java-1.17.0-openjdk-amd64
SPARK_HOME=/opt/spark
SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=s1:12181,s2:12181,s3:12181,s4:12181,s5:12181 -Dspark.deploy.zookeeper.dir=/spark"
SPARK_MASTER_HOST=10.1.1.21 # s1 on s2 it's 10.1.1.22
SPARK_MASTER_PORT=17701 
SPARK_MASTER_WEBUI_PORT=18021
SPARK_WORKER_CORES=2 # This is commented on master nodes
SPARK_WORKER_MEMORY=10g # This is commented on master nodes

Bash

Spark Executor Command: "/usr/lib/jvm/java-1.17.0-openjdk-amd64/bin/java" "-cp" "/opt/spark/conf/:/opt/spark/jars/*" "-Xmx7168M" "-Dspark.driver.port=45407" "-Djava.net.preferIPv6Addresses=false" "-XX:+IgnoreUnrecognizedVMOptions" "--add-opens=java.base/java.lang=ALL-UNNAMED" "--add-opens=java.base/java.lang.invoke=ALL-UNNAMED" "--add-opens=java.base/java.lang.reflect=ALL-UNNAMED" "--add-opens=java.base/java.io=ALL-UNNAMED" "--add-opens=java.base/java.net=ALL-UNNAMED" "--add-opens=java.base/java.nio=ALL-UNNAMED" "--add-opens=java.base/java.util=ALL-UNNAMED" "--add-opens=java.base/java.util.concurrent=ALL-UNNAMED" "--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED" "--add-opens=java.base/jdk.internal.ref=ALL-UNNAMED" "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED" "--add-opens=java.base/sun.nio.cs=ALL-UNNAMED" "--add-opens=java.base/sun.security.action=ALL-UNNAMED" "--add-opens=java.base/sun.util.calendar=ALL-UNNAMED" "--add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED" "-Djdk.reflect.useDirectMethodHandle=false" "org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url" "spark://CoarseGrainedScheduler@s1:45407" "--executor-id" "20" "--hostname" "10.1.1.23" "--cores" "2" "--app-id" "app-20240822113544-0001" "--worker-url" "spark://Worker@10.1.1.23:17701" "--resourceProfileId" "0"
========================================

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
24/08/22 11:36:04 INFO CoarseGrainedExecutorBackend: Started daemon with process name: 5122@s3
24/08/22 11:36:04 INFO SignalUtils: Registering signal handler for TERM
24/08/22 11:36:04 INFO SignalUtils: Registering signal handler for HUP
24/08/22 11:36:04 INFO SignalUtils: Registering signal handler for INT
24/08/22 11:36:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/08/22 11:36:05 INFO SecurityManager: Changing view acls to: x
24/08/22 11:36:05 INFO SecurityManager: Changing modify acls to: x
24/08/22 11:36:05 INFO SecurityManager: Changing view acls groups to: 
24/08/22 11:36:05 INFO SecurityManager: Changing modify acls groups to: 
24/08/22 11:36:05 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: x; groups with view permissions: EMPTY; users with modify permissions: x; groups with modify permissions: EMPTY
Exception in thread "main" java.lang.reflect.UndeclaredThrowableException
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1894)
    at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:61)
    at org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:429)
    at org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:418)
    at org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala)
Caused by: org.apache.spark.SparkException: Exception thrown in awaitResult: 
    at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
    at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
    at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
    at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
    at org.apache.spark.executor.CoarseGrainedExecutorBackend$.$anonfun$run$9(CoarseGrainedExecutorBackend.scala:449)
    at scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.scala:18)
    at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:563)
    at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:561)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:926)
    at scala.collection.IterableOps$WithFilter.foreach(Iterable.scala:896)
    at org.apache.spark.executor.CoarseGrainedExecutorBackend$.$anonfun$run$7(CoarseGrainedExecutorBackend.scala:447)
    at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:62)
    at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:61)
    at java.base/java.security.AccessController.doPrivileged(AccessController.java:712)
    at java.base/javax.security.auth.Subject.doAs(Subject.java:439)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1878)
    ... 4 more
Caused by: java.io.IOException: Failed to connect to s1/10.1.1.21:45407
    at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:294)
    at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:214)
    at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:226)
    at org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:204)
    at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:202)
    at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:198)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: s1/10.1.1.21:45407
Caused by: java.net.ConnectException: Connection refused
    at java.base/sun.nio.ch.Net.pollConnect(Native Method)
    at java.base/sun.nio.ch.Net.pollConnectNow(Net.java:672)
    at java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:946)
    at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:337)
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:776)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:840)

Spark Submit (--verbose):

x@s1:~$ /opt/spark/bin/spark-submit --master spark://10.1.1.21:17701 --deploy-mode cluster --verbose --name OWordCount --class WordCount wc.jar 
Using properties file: null
24/08/23 00:15:29 WARN Utils: Your hostname, s1 resolves to a loopback address: 127.0.1.1; using 10.1.1.21 instead (on interface ens33)
24/08/23 00:15:29 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Parsed arguments:
  master                  spark://10.1.1.21:17701
  remote                  null
  deployMode              cluster
  executorMemory          null
  executorCores           null
  totalExecutorCores      null
  propertiesFile          null
  driverMemory            null
  driverCores             null
  driverExtraClassPath    null
  driverExtraLibraryPath  null
  driverExtraJavaOptions  null
  supervise               false
  queue                   null
  numExecutors            null
  files                   null
  pyFiles                 null
  archives                null
  mainClass               WordCount
  primaryResource         file:/home/x/wc.jar
  name                    OWordCount
  childArgs               []
  jars                    null
  packages                null
  packagesExclusions      null
  repositories            null
  verbose                 true

Spark properties used, including those specified through
 --conf and those from the properties file null:
  

    
Main class:
org.apache.spark.deploy.ClientApp
Arguments:
launch
spark://10.1.1.21:17701
file:/home/x/wc.jar
WordCount
Spark config:
(spark.app.name,OWordCount)
(spark.app.submitTime,1724368529780)
(spark.driver.supervise,false)
(spark.jars,file:/home/x/wc.jar)
(spark.master,spark://10.1.1.21:17701)
(spark.submit.deployMode,cluster)
(spark.submit.pyFiles,)
Classpath elements:



24/08/23 00:15:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/08/23 00:15:30 INFO SecurityManager: Changing view acls to: x
24/08/23 00:15:30 INFO SecurityManager: Changing modify acls to: x
24/08/23 00:15:30 INFO SecurityManager: Changing view acls groups to: 
24/08/23 00:15:30 INFO SecurityManager: Changing modify acls groups to: 
24/08/23 00:15:30 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: x; groups with view permissions: EMPTY; users with modify permissions: x; groups with modify permissions: EMPTY
24/08/23 00:15:30 INFO Utils: Successfully started service 'driverClient' on port 45661.
24/08/23 00:15:30 INFO TransportClientFactory: Successfully created connection to /10.1.1.21:17701 after 70 ms (0 ms spent in bootstraps)
24/08/23 00:15:30 INFO ClientEndpoint: ... waiting before polling master for driver state
24/08/23 00:15:31 INFO ClientEndpoint: Driver successfully submitted as driver-20240823001530-0001
24/08/23 00:15:35 INFO ClientEndpoint: State of driver-20240823001530-0001 is FAILED
24/08/23 00:15:35 INFO ClientEndpoint: State of driver driver-20240823001530-0001 is FAILED, exiting spark-submit JVM.
24/08/23 00:15:36 INFO ShutdownHookManager: Shutdown hook called
24/08/23 00:15:36 INFO ShutdownHookManager: Deleting directory /tmp/spark-0143cf49-36cb-4dde-b528-b973d0f506e4

Spark submit ends up with stderr same as spark-shell Failed to Connect ... Connection refused ...

Things I've checked:

Questions


Solution

  • Updated spark-env.conf removing master related conf from workers and removing worker related config from masters resolved the issue.