postgresqlamazon-web-servicesdockerapache-sparkdocker-compose

Spark on Docker Fails to Connect to AWS RDS PostgreSQL via Bastion


I have an Apache Spark cluster running on a docker compose network, that needs to connect to an AWS RDS instance (postgreSQL more specifically), the following is the compose.yml file I am using to define the spark cluster:

services:
  spark-master:
    build: .
    command: bin/spark-class org.apache.spark.deploy.master.Master
    expose:
      - 8080     
      - 4040
      - 7077
    ports:
      - "9095:8080" 
      - "7777:7077" 
      - "4040:4040"
    environment:
        SPARK_PUBLIC_DNS: localhost
    volumes:
      - ./spark_jobs:/opt/bitnami/spark/spark_jobs
      - ./utility:/opt/bitnami/spark/utility
      - type: bind
        source: ./aws_rds_session.sh
        target: /opt/bitnami/spark/aws_rds_session.sh
      - type: bind
        source: ./spark_submit.sh
        target: /opt/bitnami/spark/spark_submit.sh
    #loading environment variables
    env_file:
      - ./.env
    extra_hosts:
      - "host.docker.internal:host-gateway"
    networks:
      - spark-cluster-network

  spark-worker-1:
    build: .
    command: bin/spark-class org.apache.spark.deploy.worker.Worker spark://spark-master:7077
    depends_on:
      - spark-master
    environment:
      SPARK_MODE: worker
      SPARK_MASTER_URL: spark://spark-master:7077        
      SPARK_PUBLIC_DNS: localhost  
    env_file:
      - ./.env
    extra_hosts:
      - "host.docker.internal:host-gateway"
    networks:
      - spark-cluster-network
    ports:
      - "4041:4040"
    expose:
      - 4040

  spark-worker-2:
    build: .
    command: bin/spark-class org.apache.spark.deploy.worker.Worker spark://spark-master:7077
    depends_on:
      - spark-master
    environment:
      SPARK_MODE: worker
      SPARK_MASTER_URL: spark://spark-master:7077  
      SPARK_PUBLIC_DNS: localhost
    env_file:
      - ./.env
    extra_hosts:
      - "host.docker.internal:host-gateway"
    networks:
      - spark-cluster-network
    ports:
      - "4042:4040"
    expose:
      - 4040

networks:
  spark-cluster-network:
    driver: bridge

The custom image I am using is the following:

FROM bitnami/spark:latest
USER root
# installing some dependencies
RUN apt-get update && apt-get install -y curl zip unzip 

# install AWS CLI 
RUN curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip" && \
    unzip awscliv2.zip && \ 
    ./aws/install
# install AWS session manager plugin 
RUN curl "https://s3.amazonaws.com/session-manager-downloads/plugin/latest/ubuntu_64bit/session-manager-plugin.deb" -o "session-manager-plugin.deb" && \
    apt-get update && apt-get install -y ./session-manager-plugin.deb && \
    rm -f session-manager-plugin.deb && \
    rm -rf /var/lib/apt/lists/*

# installing PostgreSQL JDBC driver
RUN curl -o /opt/bitnami/spark/jars/postgresql-42.7.5.jar https://jdbc.postgresql.org/download/postgresql-42.7.5.jar

On my spark job I try to connect to the DB and read a table:

import os
from pyspark.sql import SparkSession

# ENVIRONMENT VARIABLES
DB_HOST = os.getenv("DB_HOST")
DB_PORT = os.getenv("DB_PORT")
DB_NAME = os.getenv("DB_NAME")
DB_USER = os.getenv("DB_USER")
DB_PASSWORD = os.getenv("DB_PASSWORD")
SPARK_HOME = os.getenv("SPARK_HOME")

# PstgreSQL JDBC driver
spark = SparkSession.builder\
    .appName("ETL")\
    .config('spark.jars', f'{SPARK_HOME}/jars/postgresql-42.7.5.jar')\
    .getOrCreate()


DB_URL = f"jdbc:postgresql://{DB_HOST}:{DB_PORT}/{DB_NAME}"

sales = spark.read \
    .format("jdbc") \
    .option("url", DB_URL) \
    .option("query", 'SELECT * FROM "PR"."sale" LIMIT 100') \
    .option("user", DB_USER) \
    .option("password", DB_PASSWORD) \
    .load()
sales.show(10)

This is the job that is getting mounted and submitted in spark master container.

When the cluster is up and running, I log into the spark-master container and run the first script to configure AWS connection and do port forwarding:

# setting AWS profile
aws configure set aws_access_key_id "$AWS_ACCESS_KEY_ID" --profile prod
aws configure set aws_secret_access_key "$AWS_SECRET_ACCESS_KEY" --profile prod
aws configure set region "$AWS_DEFAULT_REGION" --profile prod
aws configure set output "$AWS_OUTPUT" --profile prod
# switch to prod profile
export AWS_PROFILE=prod
# connecting to PostgreSQL AWS RDS
bastion_id=$(***********)
aws ssm start-session --target $bastion_id --region **** --document-name AWS-StartPortForwardingSessionToRemoteHost --parameters '{"portNumber":["5432"],"localPortNumber":["5555"],"host":["****"]}' 

and on another window on the same container I run the following script, the one that will submit the spark job:

#!/bin/bash
JOB_PATH=${1:-spark_jobs/etl.py}
# adding execute permissions to spark_jobs
chmod -R +x $JOB_PATH

# submitting the Spark job
spark-submit \
    --master spark://spark-master:7077 \
    $JOB_PATH

My problem is on the first terminal, I am getting

Port 5555 opened for sessionId xxxxxxxxxxxxxxx.
Waiting for connections...

Connection accepted for session xxxxxxxxxxxxxx

then on the second one, when I run the spark submit script, I get:

WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0) (172.18.0.3 executor 0): org.postgresql.util.PSQLException: Connection to localhost:5555 refused. Check that the hostname and port are correct and that the postmaster is accepting TCP/IP connections.
(...)
Caused by: java.net.ConnectException: Connection refused

So I am getting that connection accepted from aws session manager and getting the opposite of what is expected: connection refused from the spark job. Where could this have gone wrong?


Solution

  • Ok, I resolved the issue and I am leaving what worked for me in case someone faces the same problem in the future.

    What I did wrong is running the aws ssm start-session process only in the spark master container, the connection accepted message was the master's connection being accepted, but spark works with lazy transformations, so when the master actually finds actions to be executed ( the sales.show(10) in my case ), the workers are the ones who will do the actual work, so we have spark worker containers that need a piece of data not read yet ( because lazy transformations ) to manipulate, yet they are not connected to the DB. So the connection accepted is the master's connection being accepted, and the connection error was the spark workers reporting back to the master, hence the contradiction.

    The solution is to run aws ssm start-session in both master and worker containers. And each worker container will connect separately to the DB whenever assigned with a piece of work involving data for the first time.