It's possible to configure the Beam portable runner with the spark configurations? More precisely, it's possible to configure the spark.driver.host
in the Portable Runner?
Currently, we have airflow implemented in a Kubernetes cluster, and aiming to use TensorFlow Extended we need to use Apache beam. For our use case Spark would be the appropriate runner to be used, and as airflow and TensorFlow are coded in python we would need to use the Apache Beam's Portable Runner (https://beam.apache.org/documentation/runners/spark/#portability).
The portable runner creates the spark context inside its container and does not leave space for the driver DNS configuration making the executors inside the worker pods non-communicable to the driver (the job server).
- name: beam-spark-job-server
image: apache/beam_spark_job_server:2.27.0
args: ["--spark-master-url=spark://spark-master:7077"]
Job server/airflow service:
apiVersion: v1
kind: Service
metadata:
name: airflow-scheduler
labels:
app: airflow-k8s
spec:
type: ClusterIP
selector:
app: airflow-scheduler
ports:
- port: 8793
protocol: TCP
targetPort: 8793
name: scheduler
- port: 8099
protocol: TCP
targetPort: 8099
name: job-server
- port: 7077
protocol: TCP
targetPort: 7077
name: spark-master
- port: 8098
protocol: TCP
targetPort: 8098
name: artifact
- port: 8097
protocol: TCP
targetPort: 8097
name: java-expansion
The ports 8097,8098 and 8099 are related to the job server, 8793 to airflow, and 7077 to the spark master.
python -m apache_beam.examples.wordcount --output ./data_test/ --runner=PortableRunner --job_endpoint=localhost:8099 --environment_type=LOOPBACK
from the airflow container I get the following response on the airflow pod:Defaulting container name to airflow-scheduler.
Use 'kubectl describe pod/airflow-scheduler-local-f685b5bc7-9d7r6 -n airflow-main-local' to see all of the containers in this pod.
airflow@airflow-scheduler-local-f685b5bc7-9d7r6:/opt/airflow$ python -m apache_beam.examples.wordcount --output ./data_test/ --runner=PortableRunner --job_endpoint=localhost:8099 --environment_type=LOOPBACK
INFO:apache_beam.internal.gcp.auth:Setting socket default timeout to 60 seconds.
INFO:apache_beam.internal.gcp.auth:socket default timeout is 60.0 seconds.
INFO:oauth2client.client:Timeout attempting to reach GCE metadata service.
WARNING:apache_beam.internal.gcp.auth:Unable to find default credentials to use: The Application Default Credentials are not available. They are available if running in Google Compute Engine. Otherwise, the environment variable GOOGLE_APPLICATION_CREDENTIALS must be defined pointing to a file defining the credentials. See https://developers.google.com/accounts/docs/application-default-credentials for more information.
Connecting anonymously.
INFO:apache_beam.runners.worker.worker_pool_main:Listening for workers at localhost:35837
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
INFO:root:Default Python SDK image for environment is apache/beam_python3.7_sdk:2.27.0
INFO:apache_beam.runners.portability.portable_runner:Environment "LOOPBACK" has started a component necessary for the execution. Be sure to run the pipeline using
with Pipeline() as p:
p.apply(..)
This ensures that the pipeline finishes before this program exits.
INFO:apache_beam.runners.portability.portable_runner:Job state changed to STOPPED
INFO:apache_beam.runners.portability.portable_runner:Job state changed to STARTING
INFO:apache_beam.runners.portability.portable_runner:Job state changed to RUNNING
And the worker log:
21/02/19 19:50:00 INFO Worker: Asked to launch executor app-20210219194804-0000/47 for BeamApp-root-0219194747-7d7938cf_51452c51-dffe-4c61-bcb7-60c7779e3256
21/02/19 19:50:00 INFO SecurityManager: Changing view acls to: root
21/02/19 19:50:00 INFO SecurityManager: Changing modify acls to: root
21/02/19 19:50:00 INFO SecurityManager: Changing view acls groups to:
21/02/19 19:50:00 INFO SecurityManager: Changing modify acls groups to:
21/02/19 19:50:00 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); groups with view permissions: Set(); users with modify permissions: Set(root); groups with modify permissions: Set()
21/02/19 19:50:00 INFO ExecutorRunner: Launch command: "/usr/local/openjdk-8/bin/java" "-cp" "/opt/spark/conf/:/opt/spark/jars/*" "-Xmx1024M" "-Dspark.driver.port=44447" "org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url" "spark://CoarseGrainedScheduler@airflow-scheduler-local-f685b5bc7-9d7r6:44447" "--executor-id" "47" "--hostname" "172.18.0.3" "--cores" "1" "--app-id" "app-20210219194804-0000" "--worker-url" "spark://Worker@172.18.0.3:35837"
21/02/19 19:50:02 INFO Worker: Executor app-20210219194804-0000/47 finished with state EXITED message Command exited with code 1 exitStatus 1
21/02/19 19:50:02 INFO ExternalShuffleBlockResolver: Clean up non-shuffle files associated with the finished executor 47
21/02/19 19:50:02 INFO ExternalShuffleBlockResolver: Executor is not registered (appId=app-20210219194804-0000, execId=47)
21/02/19 19:50:02 INFO Worker: Asked to launch executor app-20210219194804-0000/48 for BeamApp-root-0219194747-7d7938cf_51452c51-dffe-4c61-bcb7-60c7779e3256
21/02/19 19:50:02 INFO SecurityManager: Changing view acls to: root
21/02/19 19:50:02 INFO SecurityManager: Changing modify acls to: root
21/02/19 19:50:02 INFO SecurityManager: Changing view acls groups to:
21/02/19 19:50:02 INFO SecurityManager: Changing modify acls groups to:
21/02/19 19:50:02 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); groups with view permissions: Set(); users with modify permissions: Set(root); groups with modify permissions: Set()
21/02/19 19:50:02 INFO ExecutorRunner: Launch command: "/usr/local/openjdk-8/bin/java" "-cp" "/opt/spark/conf/:/opt/spark/jars/*" "-Xmx1024M" "-Dspark.driver.port=44447" "org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url" "spark://CoarseGrainedScheduler@airflow-scheduler-local-f685b5bc7-9d7r6:44447" "--executor-id" "48" "--hostname" "172.18.0.3" "--cores" "1" "--app-id" "app-20210219194804-0000" "--worker-url" "spark://Worker@172.18.0.3:35837"
21/02/19 19:50:04 INFO Worker: Executor app-20210219194804-0000/48 finished with state EXITED message Command exited with code 1 exitStatus 1
21/02/19 19:50:04 INFO ExternalShuffleBlockResolver: Clean up non-shuffle files associated with the finished executor 48
21/02/19 19:50:04 INFO ExternalShuffleBlockResolver: Executor is not registered (appId=app-20210219194804-0000, execId=48)
21/02/19 19:50:04 INFO Worker: Asked to launch executor app-20210219194804-0000/49 for BeamApp-root-0219194747-7d7938cf_51452c51-dffe-4c61-bcb7-60c7779e3256
21/02/19 19:50:04 INFO SecurityManager: Changing view acls to: root
21/02/19 19:50:04 INFO SecurityManager: Changing modify acls to: root
21/02/19 19:50:04 INFO SecurityManager: Changing view acls groups to:
21/02/19 19:50:04 INFO SecurityManager: Changing modify acls groups to:
21/02/19 19:50:04 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); groups with view permissions: Set(); users with modify permissions: Set(root); groups with modify permissions: Set()
21/02/19 19:50:04 INFO ExecutorRunner: Launch command: "/usr/local/openjdk-8/bin/java" "-cp" "/opt/spark/conf/:/opt/spark/jars/*" "-Xmx1024M" "-Dspark.driver.port=44447" "org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url" "spark://CoarseGrainedScheduler@airflow-scheduler-local-f685b5bc7-9d7r6:44447" "--executor-id" "49" "--hostname" "172.18.0.3" "--cores" "1" "--app-id" "app-20210219194804-0000" "--worker-url" "spark://Worker@172.18.0.3:35837"
.
.
.
As we can see, the executor is being exited constantly, and by what I know this issue is created by the missing communication between the executor and the driver (the job server in this case). Also, the "--driver-url" is translated to the driver pod name using the random port "-Dspark.driver.port".
As we can't define the name of the service, the worker tries to use the original name from the driver and to use a randomly generated port. As the configuration comes from the driver, changing the default conf files in the worker/master doesn't create any results.
Using this answer as an example, I tried to use the env variable SPARK_PUBLIC_DNS
in the job server but this didn't result in any changes in the worker logs.
Using directly in kubernetes a spark job
kubectl run spark-base --rm -it --labels="app=spark-client" --image bde2020/spark-base:2.4.5-hadoop2.7 -- bash ./spark/bin/pyspark --master spark://spark-master:7077 --conf spark.driver.host=spark-client
having the service:
apiVersion: v1
kind: Service
metadata:
name: spark-client
spec:
selector:
app: spark-client
clusterIP: None
I get a full working pyspark shell. If I omit the --conf parameter I get the same behavior as the first setup (exiting executors indefinitely)
21/02/19 20:21:02 INFO Worker: Executor app-20210219202050-0002/4 finished with state EXITED message Command exited with code 1 exitStatus 1
21/02/19 20:21:02 INFO ExternalShuffleBlockResolver: Clean up non-shuffle files associated with the finished executor 4
21/02/19 20:21:02 INFO ExternalShuffleBlockResolver: Executor is not registered (appId=app-20210219202050-0002, execId=4)
21/02/19 20:21:02 INFO Worker: Asked to launch executor app-20210219202050-0002/5 for Spark shell
21/02/19 20:21:02 INFO SecurityManager: Changing view acls to: root
21/02/19 20:21:02 INFO SecurityManager: Changing modify acls to: root
21/02/19 20:21:02 INFO SecurityManager: Changing view acls groups to:
21/02/19 20:21:02 INFO SecurityManager: Changing modify acls groups to:
21/02/19 20:21:02 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); groups with view permissions: Set(); users with modify permissions: Set(root); groups with modify permissions: Set()
21/02/19 20:21:02 INFO ExecutorRunner: Launch command: "/usr/local/openjdk-8/bin/java" "-cp" "/opt/spark/conf/:/opt/spark/jars/*" "-Xmx1024M" "-Dspark.driver.port=46161" "org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url" "spark://CoarseGrainedScheduler@spark-base:46161" "--executor-id" "5" "--hostname" "172.18.0.20" "--cores" "1" "--app-id" "app-20210219202050-0002" "--worker-url" "spark://Worker@172.18.0.20:45151"
I have three solutions to choose from depending on your deployment requirements. In order of difficulty:
beam_spark_job_server
container at all.python -m apache_beam.examples.wordcount \
--output ./data_test/ \
--runner=SparkRunner \
--spark_submit_uber_jar \
--spark_master_url=spark://spark-master:7077 \
--environment_type=LOOPBACK
You can pass the properties through a Spark configuration file. Create the Spark configuration file, and add spark.driver.host
and whatever other properties you need. In the docker run
command for the job server, mount that configuration file to the container, and set the SPARK_CONF_DIR
environment variable to point to that directory.
If that neither of those work for you, you can alternatively build your own customized version of the job server container. Pull Beam source from Github. Check out the release branch you want to use (e.g. git checkout origin/release-2.28.0
). Modify the entrypoint spark-job-server.sh and set -Dspark.driver.host=x
there. Then build the container using ./gradlew :runners:spark:job-server:container:docker -Pdocker-repository-root="your-repo" -Pdocker-tag="your-tag"
.