Issue with a spark-submit job where when I run spark-submit --jars /path-to-this/postgresql-42.7.1.jar /path-to-this/large-scale-data-processing/src/etl/load.py
using this script,
import os
from dotenv import load_dotenv
from spark_handler import SparkHandler
class DataLoader(object):
def __init__(self):
load_dotenv()
# Register PostgreSQL JDBC driver
self.spark = SparkHandler.create_session()
self.jdbc_url = "jdbc:postgresql://127.0.0.1:5432/taxi_data"
self.table_name = "taxi_trips"
self.properties = {
"user_name": "taxi_driver",
"password": "yellowcab"
}
def create_table(self):
query = """
CREATE TABLE IF NOT EXISTS taxi_trips (
VendorID INTEGER,
tpep_pickup_datetime TIMESTAMP,
tpep_dropoff_datetime TIMESTAMP,
passenger_count INTEGER,
trip_distance DECIMAL,
RatecodeID INTEGER,
store_and_fwd_flag VARCHAR(1),
PULocationID INTEGER,
DOLocationID INTEGER,
payment_type INTEGER,
fare_amount DECIMAL,
extra DECIMAL,
mta_tax DECIMAL,
tip_amount DECIMAL,
tolls_amount DECIMAL,
improvement_surcharge DECIMAL,
total_amount DECIMAL,
congestion_surcharge DECIMAL,
Airport_fee DECIMAL,
company_name VARCHAR(255)
);
"""
(
self.spark
.sql(query)
.write
.format("jdbc")
.option("url", self.jdbc_url)
.option("dbtable", "taxi_data.taxi_trips")
.option("user", self.properties["user_name"])
.option("password", self.properties["password"])
.save()
)
def load_data_into_dw(self):
df = self.spark.read.parquet(os.getenv("DATA_PATH") \
+ "/large-scale-data-processing/data/output/joined_table_a")
(
df.write.format("jdbc")
.option("url", self.jdbc_url)
.option("dbtable", "taxi_data.taxi_trips")
.option("user", self.properties.get('user_name'))
.option("password", self.properties.get('password'))
.save()
)
print("Data loaded to dw!")
if __name__ == "__main__":
dl = DataLoader()
dl.create_table()
dl.load_data_into_dw()
from pyspark.sql import SparkSession
class SparkHandler(object):
"""
Class takes care of starting up sparksesh
"""
spark: SparkSession = None
@classmethod
def create_session(cls) -> SparkSession:
if cls.spark is None:
cls.spark = (
SparkSession
.builder
.appName("my_spark_app")
.config("spark.sql.catalogImplementation", "hive")
.config("spark.driver.userClassPathFirst", "true")
.getOrCreate()
)
return cls.spark
The output from the terminal is that there's no suitable driver.
py4j.protocol.Py4JJavaError: An error occurred while calling o41.save.
: java.sql.SQLException: No suitable driver
at java.sql/java.sql.DriverManager.getDriver(DriverManager.java:298)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.
...
...
...
I've tried to debug it several different ways:
Checked for the obvious 'suitable driver' but I have java 11 and am running postgres 16.1 and the driver I'm using is the postgresql-42.7.1
I've checked for errors in my code but all seems correct with url, password, username, etc.
I've looked for any firewalls and at the psql conf files for any blockages but found none.
I'm expecting to simply load some data into psql for testing some spark code I wrote. Any help would be greatly appreciated. Cheers!
SOLVED:
Read this doc: https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html
For some odd reason the JDBC version for postgres I was trying wasn't working even though, seemingly, the version for my Java installation and Postgres matched the 42.7.1. So, then, I went to the maven repo and found the version specified in the Spark documentation and it turns it works.
Edit: As per spark documentation, include--driver-class-path
in your spark command.