pysparkspark-bigquery-connector

Issues Connecting PySpark to Big Query


I am trying to load some data from Big Query into a PySpark RDD. Getting an odd error message when running the query that I can't seem to find any solutions to.

# Scala version 2.12.15
# Spark Version 3.2.1

config = pyspark.SparkConf() \
    .setAll(
        [
            ('spark.jars.packages', 'com.google.cloud.bigdataoss:gcs-connector:hadoop3-2.2.0,com.google.cloud.spark:spark-3.2-bigquery:0.30.0,com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.30.0,com.google.cloud.spark:spark-bigquery_2.12:0.30.0'), # Add the BQ jar file
            ('spark.storage.memoryFraction', '1'),
            ("spark.executor.instances", "20"), # set number of executors to 20
            ("spark.executor.cores", "15"), # set number of cores per executor to 15
            ("spark.executor.memory", "4g"), # set executor memory to 4 gigabytes
            ("spark.driver.memory", "8g"), # set driver memory to 8 gigabytes
            ("spark.serializer", "org.apache.spark.serializer.KryoSerializer"), # use Kryo serializer, great for clustering
        ]
)
sc = pyspark.SparkContext(conf=config)
# Create the Spark Session
spark = SparkSession.builder.appName('pyspark-big-data-workflow').getOrCreate()

Then Running the below results in an error

table = 'table.name'
rdd = spark.read.format('bigquery') \
    .option('table', table)\
    .load()

rdd.show()
rdd.printSchema()

The Error

Py4JJavaError: An error occurred while calling o48.load.
: java.util.ServiceConfigurationError: org.apache.spark.sql.sources.DataSourceRegister: Provider com.google.cloud.spark.bigquery.BigQueryRelationProvider could not be instantiated
    at java.base/java.util.ServiceLoader.fail(ServiceLoader.java:582)
    at java.base/java.util.ServiceLoader$ProviderImpl.newInstance(ServiceLoader.java:804)
    at java.base/java.util.ServiceLoader$ProviderImpl.get(ServiceLoader.java:722)
    at java.base/java.util.ServiceLoader$3.next(ServiceLoader.java:1395)
    at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:46)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
    at scala.collection.IterableLike.foreach(IterableLike.scala:74)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
    at scala.collection.TraversableLike.filterImpl(TraversableLike.scala:303)
    at scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:297)
    at scala.collection.AbstractTraversable.filterImpl(Traversable.scala:108)
    at scala.collection.TraversableLike.filter(TraversableLike.scala:395)
    at scala.collection.TraversableLike.filter$(TraversableLike.scala:395)
    at scala.collection.AbstractTraversable.filter(Traversable.scala:108)
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:652)
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:720)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:210)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:174)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.IllegalStateException:  This connector was made for Scala null, it was not meant to run on Scala 2.12
    at com.google.cloud.spark.bigquery.BigQueryUtilScala$.validateScalaVersionCompatibility(BigQueryUtil.scala:37)
    at com.google.cloud.spark.bigquery.BigQueryRelationProvider.<init>(BigQueryRelationProvider.scala:42)
    at com.google.cloud.spark.bigquery.BigQueryRelationProvider.<init>(BigQueryRelationProvider.scala:49)
    at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
    at java.base/java.util.ServiceLoader$ProviderImpl.newInstance(ServiceLoader.java:780)
    ... 31 more

Not sure if my jar files are incorrect or this has to do with something else. Any direction here would be greatly appreciated!


Solution

  • Try to change your spark.jars.packages line to below and try . ('spark.jars.packages', 'com.google.cloud.bigdataoss:gcs-connector:hadoop3-2.2.0,com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.30.0)

    If it still fail on cluster , look in the cluster $SPARK_HOME/jars or spark jars path there will be another bigquery jar either get rid of it or replace with spark-bigquery-with-dependencies jar.

    Root cause is with the file spark-bigquery-connector.properties in both jars. The file in spark-bigquery-with-dependencies jar have scala.binary.version property in it others does not have the property defined. Hope it helps