pythonpostgresqlapache-sparkpysparkpy4j

Load SparkSQL dataframe into Postgres database with automatically defined schema


I am currently trying to load a Parquet file into a Postgres database. The Parquet file has schema defined already, and I want that schema to carry over onto a Postgres table.

I have not defined any schema or table in Postgres. But I want the loading process to automatically infer the schema on read and create a table, then load the SparkSQL dataframe into that table.

Here is my code:

import findspark
findspark.init()

from pyspark.sql import SparkSession

appName = "load_parquet"
master = "local"

spark = SparkSession.builder \
        .master(master) \
        .appName(appName) \
        .getOrCreate()

Read in Parquet data as a Spark dataframe

customers_sdf = spark.read.parquet('/home/jovyan/filesystem/customers.parquet')

Check that schema is correct

customers_sdf.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- customer_unique_id: string (nullable = true)
 |-- customer_zip_code_prefix: string (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)

Write SparkSQL dataframe to Postgres

customers_sdf.write \
    .jdbc(
        url="jdbc:postgresql:destdb", 
        table="public.customers", 
        properties={"user": "destdb1", "password": "destdb1"}
    )

My Postgres container hostname is postgres-dest and its port mapping is 5434:5432. See below:

  postgres-dest:
    image: postgres:latest
    environment:
      POSTGRES_USER: destdb1
      POSTGRES_PASSWORD: destdb1
      POSTGRES_DB: destdb
    logging:
      options:
        max-size: 10m
        max-file: "3"
    ports:
      - "5434:5432"
    healthcheck:
      test: ["CMD", "pg_isready", "-U", "destdb1"]
      interval: 5s
      retries: 5
    restart: always

  pyspark-notebook:
    build: .
    image: jupyter/pyspark-notebook:latest
    environment:
      JUPYTER_ENABLE_LAB: 'yes'
    ports:
      - "8889:8889"
      - "4040-4080:4040-4080"
    volumes:
      - ./notebooks:/home/jovyan/work/notebooks
      - ./filesystem:/home/jovyan/filesystem

I try to write the dataframe to Postgres as shown before, but I get this error:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
/tmp/ipykernel_101/2914557055.py in <module>
----> 1 customers_sdf.write \
      2     .jdbc(url="jdbc:postgresql://postgres-dest/destdb", table="public.customers", properties={"user": "destdb1", "password": "destdb1"})

/usr/local/spark/python/pyspark/sql/readwriter.py in jdbc(self, url, table, mode, properties)
   1443         for k in properties:
   1444             jprop.setProperty(k, properties[k])
-> 1445         self.mode(mode)._jwrite.jdbc(url, table, jprop)
   1446 
   1447 

/usr/local/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1302 
   1303         answer = self.gateway_client.send_command(command)
-> 1304         return_value = get_return_value(
   1305             answer, self.gateway_client, self.target_id, self.name)
   1306 

/usr/local/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
    109     def deco(*a, **kw):
    110         try:
--> 111             return f(*a, **kw)
    112         except py4j.protocol.Py4JJavaError as e:
    113             converted = convert_exception(e.java_exception)

/usr/local/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    324             value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325             if answer[1] == REFERENCE_TYPE:
--> 326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
    328                     format(target_id, ".", name), value)

Py4JJavaError: An error occurred while calling o231.jdbc.
: java.sql.SQLException: No suitable driver
    at java.sql/java.sql.DriverManager.getDriver(DriverManager.java:298)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$2(JDBCOptions.scala:108)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:108)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:217)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:221)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:45)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:132)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:131)
    at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
    at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301)
    at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:817)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.base/java.lang.Thread.run(Thread.java:829)

NOTE: I am an absolute beginner to Spark, so please explain like I am 5 years old.


Solution

  • Change url to jdbc:postgresql://postgres-dest:5432/destdb.

    And make sure that PostgreSQL driver jar is present in classpath. You can download the jar from here.