apache-sparkpyspark

How does spark load python package depends on the external library?


I plan to mask the data by batch using udf. The udf calls the ecc and aes to mask the data, the concrete packages are:

I got the following error

Driver stacktrace:
22/03/21 11:30:52 INFO DAGScheduler: Job 1 failed: showString at NativeMethodAccessorImpl.java:0, took 1.766196 s
Traceback (most recent call last):
  File "/home/hadoop/pyspark-dm.py", line 495, in <module>
    df_result.show()
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 485, in show
  File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 117, in deco
pyspark.sql.utils.PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/mnt/yarn/usercache/hadoop/appcache/application_1647852036838_0018/container_1647852036838_0018_01_000004/pyspark.zip/pyspark/worker.py", line 588, in main
    func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type)
  File "/mnt/yarn/usercache/hadoop/appcache/application_1647852036838_0018/container_1647852036838_0018_01_000004/pyspark.zip/pyspark/worker.py", line 447, in read_udfs
    udfs.append(read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index=i))
  File "/mnt/yarn/usercache/hadoop/appcache/application_1647852036838_0018/container_1647852036838_0018_01_000004/pyspark.zip/pyspark/worker.py", line 249, in read_single_udf
    f, return_type = read_command(pickleSer, infile)
  File "/mnt/yarn/usercache/hadoop/appcache/application_1647852036838_0018/container_1647852036838_0018_01_000004/pyspark.zip/pyspark/worker.py", line 69, in read_command
    command = serializer._read_with_length(file)
  File "/mnt/yarn/usercache/hadoop/appcache/application_1647852036838_0018/container_1647852036838_0018_01_000004/pyspark.zip/pyspark/serializers.py", line 160, in _read_with_length
    return self.loads(obj)
  File "/mnt/yarn/usercache/hadoop/appcache/application_1647852036838_0018/container_1647852036838_0018_01_000004/pyspark.zip/pyspark/serializers.py", line 430, in loads
    return pickle.loads(obj, encoding=encoding)
ModuleNotFoundError: No module named 'ecies'

I loaded the environment by archives

os.environ['PYSPARK_PYTHON'] = "./environment/bin/python"
# spark session initialization 
spark =SparkSession.builder.config('spark.sql.hive.metastore.sharedPrefixes', 'com.amazonaws.services.dynamodbv2').config('spark.sql.warehouse.dir', 'hdfs:///user/spark/warehouse').config('spark.sql.catalogImplementation', 'hive').config("spark.archives", "pyspark_venv.tar.gz#environment").getOrCreate()

I pack the dependency library by venv-pack and upload it by spark-submit

22/03/21 11:44:36 INFO SparkContext: Unpacking an archive pyspark_venv.tar.gz#environment from /mnt/tmp/spark-060999fd-4410-405d-8d15-1b832d09f86c/pyspark_venv.tar.gz to /mnt/tmp/spark-dc9e1f8b-5d91-4ccf-8f20-d85ed72e9eca/userFiles-1c03e075-1fb2-4ffd-a527-bb4d650e4df8/environment

When I executed the pyspark script in local mode, it worked well.


Solution

  • Build the archives

    Modify the source code

    conf = SparkConf()
    conf.setExecutorEnv('PYSPARK_PYTHON', './environment/bin/python')
    conf.set('spark.sql.hive.metastore.sharedPrefixes', 'com.amazonaws.services.dynamodbv2')
    conf.set('spark.sql.warehouse.dir', 'hdfs:///user/spark/warehouse')
    conf.set('spark.sql.catalogImplementation', 'hive')
    conf.set("spark.archives", "pyspark_venv.tar.gz#environment")
    

    Submit the job

    spark-submit pyspark-dm.py --archives pyspark_env.tar.gz