I plan to mask the data by batch using udf. The udf calls the ecc and aes to mask the data, the concrete packages are:
I got the following error
Driver stacktrace:
22/03/21 11:30:52 INFO DAGScheduler: Job 1 failed: showString at NativeMethodAccessorImpl.java:0, took 1.766196 s
Traceback (most recent call last):
File "/home/hadoop/pyspark-dm.py", line 495, in <module>
df_result.show()
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 485, in show
File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 117, in deco
pyspark.sql.utils.PythonException:
An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
File "/mnt/yarn/usercache/hadoop/appcache/application_1647852036838_0018/container_1647852036838_0018_01_000004/pyspark.zip/pyspark/worker.py", line 588, in main
func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type)
File "/mnt/yarn/usercache/hadoop/appcache/application_1647852036838_0018/container_1647852036838_0018_01_000004/pyspark.zip/pyspark/worker.py", line 447, in read_udfs
udfs.append(read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index=i))
File "/mnt/yarn/usercache/hadoop/appcache/application_1647852036838_0018/container_1647852036838_0018_01_000004/pyspark.zip/pyspark/worker.py", line 249, in read_single_udf
f, return_type = read_command(pickleSer, infile)
File "/mnt/yarn/usercache/hadoop/appcache/application_1647852036838_0018/container_1647852036838_0018_01_000004/pyspark.zip/pyspark/worker.py", line 69, in read_command
command = serializer._read_with_length(file)
File "/mnt/yarn/usercache/hadoop/appcache/application_1647852036838_0018/container_1647852036838_0018_01_000004/pyspark.zip/pyspark/serializers.py", line 160, in _read_with_length
return self.loads(obj)
File "/mnt/yarn/usercache/hadoop/appcache/application_1647852036838_0018/container_1647852036838_0018_01_000004/pyspark.zip/pyspark/serializers.py", line 430, in loads
return pickle.loads(obj, encoding=encoding)
ModuleNotFoundError: No module named 'ecies'
I loaded the environment by archives
os.environ['PYSPARK_PYTHON'] = "./environment/bin/python"
# spark session initialization
spark =SparkSession.builder.config('spark.sql.hive.metastore.sharedPrefixes', 'com.amazonaws.services.dynamodbv2').config('spark.sql.warehouse.dir', 'hdfs:///user/spark/warehouse').config('spark.sql.catalogImplementation', 'hive').config("spark.archives", "pyspark_venv.tar.gz#environment").getOrCreate()
I pack the dependency library by venv-pack and upload it by spark-submit
22/03/21 11:44:36 INFO SparkContext: Unpacking an archive pyspark_venv.tar.gz#environment from /mnt/tmp/spark-060999fd-4410-405d-8d15-1b832d09f86c/pyspark_venv.tar.gz to /mnt/tmp/spark-dc9e1f8b-5d91-4ccf-8f20-d85ed72e9eca/userFiles-1c03e075-1fb2-4ffd-a527-bb4d650e4df8/environment
When I executed the pyspark script in local mode, it worked well.
conf = SparkConf()
conf.setExecutorEnv('PYSPARK_PYTHON', './environment/bin/python')
conf.set('spark.sql.hive.metastore.sharedPrefixes', 'com.amazonaws.services.dynamodbv2')
conf.set('spark.sql.warehouse.dir', 'hdfs:///user/spark/warehouse')
conf.set('spark.sql.catalogImplementation', 'hive')
conf.set("spark.archives", "pyspark_venv.tar.gz#environment")
spark-submit pyspark-dm.py --archives pyspark_env.tar.gz