pythonapache-sparkpysparkavrospark-avro

PySpark works in terminal but not when executed in Python code


I am trying to read an avro file type. the following is the sample data source that I have found online to test my code:

https://github.com/Teradata/kylo/blob/master/samples/sample-data/avro/userdata1.avro

The following is my code (please assume that source_path is the path to the data linked above):

from pyspark.sql import SparkSession

def avro_reader(source_path: str):

    spark = SparkSession \
        .builder \
        .master("yarn") \
        .enableHiveSupport() \
        .config("hive.exec.dynamic.partition", "true") \
        .config("hive.exec.dynamic.partition.mode", "nonstrict") \
        .getOrCreate()

    reader = spark.read.format("com.databricks.spark.avro").load(source_path)

    return reader.show()

print(avro_reader(source_path))

The following is the error that I am receiving:

Warning: Ignoring non-Spark config property: hive.exec.dynamic.partition.mode
Warning: Ignoring non-Spark config property: hive.exec.dynamic.partition
Exception in thread "main" org.apache.spark.SparkException: When running with master 'yarn' either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.
        at org.apache.spark.deploy.SparkSubmitArguments.error(SparkSubmitArguments.scala:631)
        at org.apache.spark.deploy.SparkSubmitArguments.validateSubmitArguments(SparkSubmitArguments.scala:271)
        at org.apache.spark.deploy.SparkSubmitArguments.validateArguments(SparkSubmitArguments.scala:234)
        at org.apache.spark.deploy.SparkSubmitArguments.<init>(SparkSubmitArguments.scala:119)
        at org.apache.spark.deploy.SparkSubmit$$anon$2$$anon$3.<init>(SparkSubmit.scala:1022)
        at org.apache.spark.deploy.SparkSubmit$$anon$2.parseArguments(SparkSubmit.scala:1022)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:85)
        at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1039)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1048)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Spark works perfectly fine when I run pyspark in the terminal. So, I am unsure what is causing this issue. Below is the output of running pyspark in terminal:

Python 3.8.2 (default, Apr  8 2021, 23:19:18) 
[Clang 12.0.5 (clang-1205.0.22.9)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
21/06/15 01:15:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.1.2
      /_/

Using Python version 3.8.2 (default, Apr  8 2021 23:19:18)

Following the recommendation to remove .master("yarn"), this is the error:

Warning: Ignoring non-Spark config property: hive.exec.dynamic.partition.mode
Warning: Ignoring non-Spark config property: hive.exec.dynamic.partition
21/06/15 12:40:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Traceback (most recent call last):
  File "main.py", line 88, in <module>
    print(avro_reader('userdata1.avro'))
  File "main.py", line 26, in avro_reader
    reader = spark.read.format("com.databricks.spark.avro").load(source_path)
  File "/Users/zaki.siyaji/Desktop/avro_proj/venv/lib/python3.8/site-packages/pyspark/sql/readwriter.py", line 204, in load
    return self._df(self._jreader.load(path))
  File "/Users/zaki.siyaji/Desktop/avro_proj/venv/lib/python3.8/site-packages/py4j/java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "/Users/zaki.siyaji/Desktop/avro_proj/venv/lib/python3.8/site-packages/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/Users/zaki.siyaji/Desktop/avro_proj/venv/lib/python3.8/site-packages/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o36.load.
: java.lang.ClassNotFoundException: Failed to find data source: org.apache.spark.sql.avro.AvroFileFormat. Please find packages at http://spark.apache.org/third-party-projects.html
        at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:692)
        at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:746)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:265)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:239)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.avro.AvroFileFormat.DefaultSource
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
        at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:666)
        at scala.util.Try$.apply(Try.scala:213)
        at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:666)
        at scala.util.Failure.orElse(Try.scala:224)
        at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:666)
        ... 14 more

Solution

  • Remove the .master("yarn") if you're running locally, and not on the YARN cluster. If you're running on the YARN cluster then you need to have environment setup correctly, and follow the documentation on submitting to YARN.

    Update after new error message:

    1. You need to change from "com.databricks.spark.avro" to "avro" as Avro now is supported by Spark itself.
    2. And you need to submit the job with correct library attached (doc):
    ./bin/spark-submit --packages org.apache.spark:spark-avro_2.12:<spark_version>