I am working on a project to port a Python proof of concept (POC) over to PySpark. The POC heavily leverages Postgres and specifically the PostGIS geospatial library. Most the work consists of Python issuing commands to Postgres before calling back the data for final processing.
Some of the queries being passed to Postgres contain CREATE TABLE
, INSERT
, CREATE TEMP TABLE
, and CTE WITH
statements. I am trying to determine if it is possible to pass these queries to Postgres from Spark via JDBC.
Can someone confirm whether this functionality is available within Spark JDBC to other databases? To be clear, I am wanting to pass plain english SQL queries to Postgres, not use the SparkSQL APIs available (as they don't support all the operations I need). I am using Spark version 2.3.0, PostgreSQL 10.11, and Python 2.7.5 (yes I know about EOL for Python 2, that's another story).
Here is what I've tried so far:
SparkSession.read
postgres = SparkSession.builder \
.appName("myApp") \
.config("spark.jars", "/usr/share/java/postgresql-jdbc.jar") \
.getOrCreate()
dbtable
paramqry = """create table test (name varchar(50), age int)"""
qry
to dbtable
param of Postgres spark session objectpostgres.read \
.format("jdbc") \
.option("url", "jdbc:postgresql://....) \
.option("dbtable", qry) \
.option("user", configs['user']) \
.option("password", configs['password']) \
.option("driver", "org.postgresql.Driver") \
.option("ssl", "true") \
.load()
Which returns the following syntax error (that same sort of error results when using other SQL commands listed above):
Traceback (most recent call last):
File "/tmp/zeppelin_pyspark-5711943099029736374.py", line 367, in <module>
raise Exception(traceback.format_exc())
Exception: Traceback (most recent call last):
File "/tmp/zeppelin_pyspark-5711943099029736374.py", line 360, in <module>
exec(code, _zcUserQueryNameSpace)
File "<stdin>", line 9, in <module>
File "/usr/hdp/current/spark2-client/python/pyspark/sql/readwriter.py", line 172, in load
return self._df(self._jreader.load())
File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/usr/hdp/current/spark2-client/python/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.6-src.zip/py4j/protocol.py", line 320, in get_return_value
format(target_id, ".", name), value)
Py4JJavaError: An error occurred while calling o484.load.
: org.postgresql.util.PSQLException: ERROR: syntax error at or near "create"
Position: 15
SparkSession.sql()
postgres
object defined abovepostgres.sql("""create table (name varchar(50), age int)""")
Which returns the following parse exception:
Traceback (most recent call last):
File "/tmp/zeppelin_pyspark-5711943099029736374.py", line 367, in <module>
raise Exception(traceback.format_exc())
Exception: Traceback (most recent call last):
File "/tmp/zeppelin_pyspark-5711943099029736374.py", line 360, in <module>
exec(code, _zcUserQueryNameSpace)
File "<stdin>", line 1, in <module>
File "/usr/hdp/current/spark2-client/python/pyspark/sql/session.py", line 714, in sql
return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/usr/hdp/current/spark2-client/python/pyspark/sql/utils.py", line 73, in deco
raise ParseException(s.split(': ', 1)[1], stackTrace)
ParseException: u"\nno viable alternative at input 'create table ('(line 1, pos 13)\n\n== SQL ==\ncreate table (name varchar(50), age int)\n-------------^^^\n"
And if I wrap the query in quotes as in postgres.sql("(create table (name varchar(50), age int))")
I then get a different parse exception that leads me to believe the functionality I want is not possible:
Traceback (most recent call last):
File "/tmp/zeppelin_pyspark-5711943099029736374.py", line 367, in <module>
raise Exception(traceback.format_exc())
Exception: Traceback (most recent call last):
File "/tmp/zeppelin_pyspark-5711943099029736374.py", line 360, in <module>
exec(code, _zcUserQueryNameSpace)
File "<stdin>", line 1, in <module>
File "/usr/hdp/current/spark2-client/python/pyspark/sql/session.py", line 714, in sql
return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/usr/hdp/current/spark2-client/python/pyspark/sql/utils.py", line 73, in deco
raise ParseException(s.split(': ', 1)[1], stackTrace)
ParseException: u"\nextraneous input 'create' expecting {'(', 'SELECT', 'FROM', 'VALUES', 'TABLE', 'INSERT', 'MAP', 'REDUCE'}(line 1, pos 1)\n\n== SQL ==\n(create table (name varchar(50), age int))\n-^^^\n"
My questions boil down to:
spark.sql()
API be leveraged somehow with Postgres?I have scoured the internet in attempts to find examples of using SparkSQL to issue these sorts of SQL queries to PostgreSQL but have not found any solutions. If there is a solution I would appreciate seeing an example, otherwise confirmation that this is not possible will be more than sufficient.
Is what I'm trying to accomplish even possible?
I would say no. Spark is a framework for data processing therefore its API mostly developed for read and write operations with datasources. In your case, you have some DDL statements and Spark isn't supposed to perform such operations.
For example, dbtable
option from your first example has to be a table name or some SELECT query.
If you need to run some DDL, DCL, TCL queries then you should do this other way, e.g. via psycopg2
module.
Can the spark.sql() API be leveraged somehow with Postgres?
spark.sql
is a method to perform SparkSQL code over registered within SparkSession tables or views. It works with any supported datasources, not only jdbc, but it works on a Spark side with SparkSQL syntax. For example
val spark = SparkSession
...
.getOrCreate()
spark.read
.format("jdbc")
.option("url", "jdbc:postgresql://ip/database_name")
.option("dbtable", "schema.tablename")
.load()
.createOrReplaceTempView("my_spark_table_over_postgresql_table")
// and then you can operate with a view:
val df = spark.sql("select * from my_spark_table_over_postgresql_table where ... ")