I have an Apache Spark cluster running on a docker compose network, that needs to connect to an AWS RDS instance (postgreSQL more specifically), the following is the compose.yml file I am using to define the spark cluster:
services:
spark-master:
build: .
command: bin/spark-class org.apache.spark.deploy.master.Master
expose:
- 8080
- 4040
- 7077
ports:
- "9095:8080"
- "7777:7077"
- "4040:4040"
environment:
SPARK_PUBLIC_DNS: localhost
volumes:
- ./spark_jobs:/opt/bitnami/spark/spark_jobs
- ./utility:/opt/bitnami/spark/utility
- type: bind
source: ./aws_rds_session.sh
target: /opt/bitnami/spark/aws_rds_session.sh
- type: bind
source: ./spark_submit.sh
target: /opt/bitnami/spark/spark_submit.sh
#loading environment variables
env_file:
- ./.env
extra_hosts:
- "host.docker.internal:host-gateway"
networks:
- spark-cluster-network
spark-worker-1:
build: .
command: bin/spark-class org.apache.spark.deploy.worker.Worker spark://spark-master:7077
depends_on:
- spark-master
environment:
SPARK_MODE: worker
SPARK_MASTER_URL: spark://spark-master:7077
SPARK_PUBLIC_DNS: localhost
env_file:
- ./.env
extra_hosts:
- "host.docker.internal:host-gateway"
networks:
- spark-cluster-network
ports:
- "4041:4040"
expose:
- 4040
spark-worker-2:
build: .
command: bin/spark-class org.apache.spark.deploy.worker.Worker spark://spark-master:7077
depends_on:
- spark-master
environment:
SPARK_MODE: worker
SPARK_MASTER_URL: spark://spark-master:7077
SPARK_PUBLIC_DNS: localhost
env_file:
- ./.env
extra_hosts:
- "host.docker.internal:host-gateway"
networks:
- spark-cluster-network
ports:
- "4042:4040"
expose:
- 4040
networks:
spark-cluster-network:
driver: bridge
The custom image I am using is the following:
FROM bitnami/spark:latest
USER root
# installing some dependencies
RUN apt-get update && apt-get install -y curl zip unzip
# install AWS CLI
RUN curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip" && \
unzip awscliv2.zip && \
./aws/install
# install AWS session manager plugin
RUN curl "https://s3.amazonaws.com/session-manager-downloads/plugin/latest/ubuntu_64bit/session-manager-plugin.deb" -o "session-manager-plugin.deb" && \
apt-get update && apt-get install -y ./session-manager-plugin.deb && \
rm -f session-manager-plugin.deb && \
rm -rf /var/lib/apt/lists/*
# installing PostgreSQL JDBC driver
RUN curl -o /opt/bitnami/spark/jars/postgresql-42.7.5.jar https://jdbc.postgresql.org/download/postgresql-42.7.5.jar
On my spark job I try to connect to the DB and read a table:
import os
from pyspark.sql import SparkSession
# ENVIRONMENT VARIABLES
DB_HOST = os.getenv("DB_HOST")
DB_PORT = os.getenv("DB_PORT")
DB_NAME = os.getenv("DB_NAME")
DB_USER = os.getenv("DB_USER")
DB_PASSWORD = os.getenv("DB_PASSWORD")
SPARK_HOME = os.getenv("SPARK_HOME")
# PstgreSQL JDBC driver
spark = SparkSession.builder\
.appName("ETL")\
.config('spark.jars', f'{SPARK_HOME}/jars/postgresql-42.7.5.jar')\
.getOrCreate()
DB_URL = f"jdbc:postgresql://{DB_HOST}:{DB_PORT}/{DB_NAME}"
sales = spark.read \
.format("jdbc") \
.option("url", DB_URL) \
.option("query", 'SELECT * FROM "PR"."sale" LIMIT 100') \
.option("user", DB_USER) \
.option("password", DB_PASSWORD) \
.load()
sales.show(10)
This is the job that is getting mounted and submitted in spark master container.
When the cluster is up and running, I log into the spark-master container and run the first script to configure AWS connection and do port forwarding:
# setting AWS profile
aws configure set aws_access_key_id "$AWS_ACCESS_KEY_ID" --profile prod
aws configure set aws_secret_access_key "$AWS_SECRET_ACCESS_KEY" --profile prod
aws configure set region "$AWS_DEFAULT_REGION" --profile prod
aws configure set output "$AWS_OUTPUT" --profile prod
# switch to prod profile
export AWS_PROFILE=prod
# connecting to PostgreSQL AWS RDS
bastion_id=$(***********)
aws ssm start-session --target $bastion_id --region **** --document-name AWS-StartPortForwardingSessionToRemoteHost --parameters '{"portNumber":["5432"],"localPortNumber":["5555"],"host":["****"]}'
and on another window on the same container I run the following script, the one that will submit the spark job:
#!/bin/bash
JOB_PATH=${1:-spark_jobs/etl.py}
# adding execute permissions to spark_jobs
chmod -R +x $JOB_PATH
# submitting the Spark job
spark-submit \
--master spark://spark-master:7077 \
$JOB_PATH
My problem is on the first terminal, I am getting
Port 5555 opened for sessionId xxxxxxxxxxxxxxx.
Waiting for connections...
Connection accepted for session xxxxxxxxxxxxxx
then on the second one, when I run the spark submit script, I get:
WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0) (172.18.0.3 executor 0): org.postgresql.util.PSQLException: Connection to localhost:5555 refused. Check that the hostname and port are correct and that the postmaster is accepting TCP/IP connections.
(...)
Caused by: java.net.ConnectException: Connection refused
So I am getting that connection accepted from aws session manager and getting the opposite of what is expected: connection refused from the spark job. Where could this have gone wrong?
Ok, I resolved the issue and I am leaving what worked for me in case someone faces the same problem in the future.
What I did wrong is running the aws ssm start-session
process only in the spark master container, the connection accepted message was the master's connection being accepted, but spark works with lazy transformations, so when the master actually finds actions to be executed ( the sales.show(10)
in my case ), the workers are the ones who will do the actual work, so we have spark worker containers that need a piece of data not read yet ( because lazy transformations ) to manipulate, yet they are not connected to the DB. So the connection accepted
is the master's connection being accepted, and the connection error
was the spark workers reporting back to the master, hence the contradiction.
The solution is to run aws ssm start-session
in both master and worker containers. And each worker container will connect separately to the DB whenever assigned with a piece of work involving data for the first time.