pythonapache-sparkpysparkjdbcmssql-jdbc

Connecting to MSSQL Server database using pyspark


I am new to pyspark and trying to connect to mssql server database. Here are the details: This gets printed when I run the script I have.

('Processing table:', u'POL_ACTION_AMEND')
('Table schema:', u'dbo')
('Source_database:', u'PRD01_IPS')
('SQL Query:', '(SELECT TOP 100 * FROM PRD01_IPS.dbo.POL_EVENT_HISTORY)')
('jdbc_uri:', 'jdbc:jtds:sqlserver://REPLCLPRD01\REPL/PRD01_IPS')

But spark.read.format() is throwing an error when trying to load the data from the table.

select_statement_sql_server = "(SELECT TOP 100 * FROM {source_database}.{schema}.{table_name})"


# Read data for a table 
data_df = (
    spark.read.format("jdbc")
        .option("url",source_jdbc_uri)
        .option("user",source_jdbc_user)
        .option("password",source_jdbc_pass)
        .option("driver", source_jdbc_driver)
        .option("dbtable", select_statement_sql_server.format(
            source_database=source_database, schema=schema, table_name='POL_EVENT_HISTORY'
        ))
        .load()
)
    
data.show()

The error I am getting is:

('SQL Query:', '(SELECT TOP 100 * FROM PRD01_IPS.dbo.POL_EVENT_HISTORY)')
Traceback (most recent call last):
  File "/pkg/lxd0bigd/Talend_To_Pyspark/PSTAR_IPS/200job_IPS.py", line 136, in <module>
    .option("dbtable", select_statement_sql_server.format(source_database=source_database, schema=schema, table_name='POL_EVENT_HISTORY')) \
  File "/opt/cloudera/parcels/CDH-7.1.8-1.cdh7.1.8.p0.30990532/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 172, in load
  File "/opt/cloudera/parcels/CDH-7.1.8-1.cdh7.1.8.p0.30990532/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/opt/cloudera/parcels/CDH-7.1.8-1.cdh7.1.8.p0.30990532/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
  File "/opt/cloudera/parcels/CDH-7.1.8-1.cdh7.1.8.p0.30990532/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o236.load.
: java.lang.NullPointerException
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:71)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation$.getSchema(JDBCRelation.scala:211)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:35)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:332)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:243)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:231)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:187)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:750)

I have a shell script to run the script which has spark-submit command.

spark-submit --master yarn --deploy-mode client \
    --driver-class-path ${DRIVER_AND_JAR_FILE_PATH} \
    --jars ${DRIVER_AND_JAR_FILE_PATH},${JAR_FILE_XML} \
    --conf "spark.dynamicAllocation.enabled=true" \
    --conf "spark.yarn.dist.files=/etc/hive/conf.cloudera.hive/hive-site.xml" \
    ${ROOT_DIR}/200job_IPS.py \
    --hdfspath ${XML_FILE_HDFS} >> ${LOGFILE_DIR}/200job_log.txt 2>&1 

sh /pkg/lxd0bigd/Talend_To_Pyspark/PSTAR_IPS/Ingestion_PSTAR.sh dev \
    /pkg/lxd0bigd/Talend_To_Pyspark/spark-jobs/lib/mssql-jdbc-6.2.1.jre7.jar

The DRIVER_AND_JAR_FILE_PATH is what I am not sure about. I have 3 jar files and I am not sure which to be used. I tried all 3, and the same issue exists.

mssql-jdbc-6.2.1.jre7.jar, sqljdbc41.jar, jtds-1.3.1-patch.jar

Any suggestions and solutions will be greatly helpful.


Solution

  • Found the solution.

    1. Had to chnage the jdbc uri to "jdbc:sqlserver://REPLCLPRD01\REPL;databaseName=PRD01_IPS"

    2. Chnage the driver to mssql-jdbc-6.2.1.jre7.jar

    3. If we use .option("query"), the select statement can be as is. If we use option(dbtable), the select statement should be within brackets and should have an alias.

    "(SELECT TOP 100 * FROM {source_database}.{schema}.{table_name}) tmp"