pythonpostgresqlapache-sparkjdbcpyspark

SparkSQL JDBC (PySpark) to Postgres - Creating Tables and Using CTEs


I am working on a project to port a Python proof of concept (POC) over to PySpark. The POC heavily leverages Postgres and specifically the PostGIS geospatial library. Most the work consists of Python issuing commands to Postgres before calling back the data for final processing.

Some of the queries being passed to Postgres contain CREATE TABLE, INSERT, CREATE TEMP TABLE, and CTE WITH statements. I am trying to determine if it is possible to pass these queries to Postgres from Spark via JDBC.

Can someone confirm whether this functionality is available within Spark JDBC to other databases? To be clear, I am wanting to pass plain english SQL queries to Postgres, not use the SparkSQL APIs available (as they don't support all the operations I need). I am using Spark version 2.3.0, PostgreSQL 10.11, and Python 2.7.5 (yes I know about EOL for Python 2, that's another story).

Here is what I've tried so far:

Using SparkSession.read

create Spark session to Postgres

postgres = SparkSession.builder \
    .appName("myApp") \
    .config("spark.jars", "/usr/share/java/postgresql-jdbc.jar") \
    .getOrCreate()

define query to be passed to dbtable param

qry = """create table test (name varchar(50), age int)"""

pass qry to dbtable param of Postgres spark session object

postgres.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://....) \
    .option("dbtable", qry) \
    .option("user", configs['user']) \
    .option("password", configs['password']) \
    .option("driver", "org.postgresql.Driver") \
    .option("ssl", "true") \
    .load()

Which returns the following syntax error (that same sort of error results when using other SQL commands listed above):

Traceback (most recent call last):
  File "/tmp/zeppelin_pyspark-5711943099029736374.py", line 367, in <module>
    raise Exception(traceback.format_exc())
Exception: Traceback (most recent call last):
  File "/tmp/zeppelin_pyspark-5711943099029736374.py", line 360, in <module>
    exec(code, _zcUserQueryNameSpace)
  File "<stdin>", line 9, in <module>
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/readwriter.py", line 172, in load
    return self._df(self._jreader.load())
  File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.6-src.zip/py4j/protocol.py", line 320, in get_return_value
    format(target_id, ".", name), value)
Py4JJavaError: An error occurred while calling o484.load.
: org.postgresql.util.PSQLException: ERROR: syntax error at or near "create"
  Position: 15

Using SparkSession.sql()

leverage same postgres object defined above

pass query to .sql()

postgres.sql("""create table (name varchar(50), age int)""")

Which returns the following parse exception:

Traceback (most recent call last):
  File "/tmp/zeppelin_pyspark-5711943099029736374.py", line 367, in <module>
    raise Exception(traceback.format_exc())
Exception: Traceback (most recent call last):
  File "/tmp/zeppelin_pyspark-5711943099029736374.py", line 360, in <module>
    exec(code, _zcUserQueryNameSpace)
  File "<stdin>", line 1, in <module>
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/session.py", line 714, in sql
    return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
  File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/utils.py", line 73, in deco
    raise ParseException(s.split(': ', 1)[1], stackTrace)
ParseException: u"\nno viable alternative at input 'create table ('(line 1, pos 13)\n\n== SQL ==\ncreate table (name varchar(50), age int)\n-------------^^^\n"

And if I wrap the query in quotes as in postgres.sql("(create table (name varchar(50), age int))") I then get a different parse exception that leads me to believe the functionality I want is not possible:

Traceback (most recent call last):
  File "/tmp/zeppelin_pyspark-5711943099029736374.py", line 367, in <module>
    raise Exception(traceback.format_exc())
Exception: Traceback (most recent call last):
  File "/tmp/zeppelin_pyspark-5711943099029736374.py", line 360, in <module>
    exec(code, _zcUserQueryNameSpace)
  File "<stdin>", line 1, in <module>
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/session.py", line 714, in sql
    return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
  File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/utils.py", line 73, in deco
    raise ParseException(s.split(': ', 1)[1], stackTrace)
ParseException: u"\nextraneous input 'create' expecting {'(', 'SELECT', 'FROM', 'VALUES', 'TABLE', 'INSERT', 'MAP', 'REDUCE'}(line 1, pos 1)\n\n== SQL ==\n(create table (name varchar(50), age int))\n-^^^\n"

My questions boil down to:

  1. Is my approach missing some sort of config or other necessary step?
  2. Can the spark.sql() API be leveraged somehow with Postgres?
  3. Is what I'm trying to accomplish even possible?

I have scoured the internet in attempts to find examples of using SparkSQL to issue these sorts of SQL queries to PostgreSQL but have not found any solutions. If there is a solution I would appreciate seeing an example, otherwise confirmation that this is not possible will be more than sufficient.


Solution

  • Is what I'm trying to accomplish even possible?

    I would say no. Spark is a framework for data processing therefore its API mostly developed for read and write operations with datasources. In your case, you have some DDL statements and Spark isn't supposed to perform such operations.

    For example, dbtable option from your first example has to be a table name or some SELECT query.

    If you need to run some DDL, DCL, TCL queries then you should do this other way, e.g. via psycopg2 module.

    Can the spark.sql() API be leveraged somehow with Postgres?

    spark.sql is a method to perform SparkSQL code over registered within SparkSession tables or views. It works with any supported datasources, not only jdbc, but it works on a Spark side with SparkSQL syntax. For example

    val spark = SparkSession
            ...
            .getOrCreate()
    
    spark.read
      .format("jdbc")
      .option("url", "jdbc:postgresql://ip/database_name")
      .option("dbtable", "schema.tablename")
      .load()
      .createOrReplaceTempView("my_spark_table_over_postgresql_table")
    
    // and then you can operate with a view:
    val df = spark.sql("select * from my_spark_table_over_postgresql_table where ... ")