I want to unzip many files in 7z format in PySpark on Databricks. The zip files contain several thousand tiny files.
I read the files using binary File and I use a UDF to unzip the files:
schema = ArrayType(StringType())
@F.udf(returnType=schema)
def unzip_content_udf(content):
extracted_file_contents= []
with py7zr.SevenZipFile(io.BytesIO(content), mode='r') as z:
for name, bytes_stream in z.readall().items():
if name.startswith("v1") or name.startswith("v2"):
unzipped_content = bytes_stream.read().decode(ENCODING)
extracted_file_contents.append(unzipped_content)
return extracted_file_contents
df = spark.read.format("binaryFile").load("/mnt/file_pattern*")
df = df.withColumn("unzipped_files", unzip_content_udf(F.col("content")))
df.write.mode("overwrite").parquet("/mnt/test_dump_unzipped")
This works well for smaller files, but if I specify one of the larger files (150 MB zipped, 4.5GB unzipped) the process dies and I get:
Py4JJavaError: An error occurred while calling o1665.parquet.
ValueError: can not serialize object larger than 2G
I guess, it makes sense since the serialization limit is smaller than the unzipped file size.
Do you have any ideas on how to either increase the limit or chunk the size of the unzip operation below the limit?
Typical stackoverflow answer would be: You're doing it wrong.
This seems like a misuse of Spark as you're not really using spark features. You're mostly using it to distribute unzipping across multiple nodes of a cluster. E.g. you could've used dispy instead.
can not serialize object larger than 2G
IMO it's very reasonable to say if a single row in your Dataframe/table is more than 2GB, then you have some data modelling issues.
Primary problems in trying to do this using a udf are:
Java byte-Array has max capacity of 2GB. I don't know if this is the reason for Spark's limitation, but what it means is that you might not be able to throw more money/hardware at this problem and change some spark config to serialize 4.5 GB, i.e. run the code you posted as is on bigger hardware.
Overall, IMO you're running into the issues because you're using wrong tool (spark) to do this.
Options:
ssh
commands to bunch of worker nodes to unzip the files in parallel across a cluster. Listing can be done in parallel (ThreadPoolExecutor).
@F.udf(returnType=ArrayType(StringType()))
def create_sub_groups(zip_path):
# parse metadata of zip_path to get list of files and their sizes inside it. No content decoding.
# create sub groups of inner files such that each group's total size is less than 2GB (or a few MB IMO)
# return sub-groups
@F.udf(ArrayType(StringType()))
def unzip_sub_group(...):
# similar to unzip_content_udf, but only decodes files in sub-group
df = spark.read.format("binaryFile").load("/mnt/file_pattern*")
df.select('path', 'length', 'content').show()
# +-----------------------+------+--------------------+
# |path |length|content |
# +-----------------------+------+--------------------+
# |file:/path/to/file1.7z |123456|[byte array content]|
# |file:/path/to/file2.7z |123456|[byte array content]|
# +-------------------+---+------+--------------------+
df = df.withColumn('sub_groups', create_sub_groups(df.path))
df.show()
# +-----------------------+------+--------------------+-----------------------------------+
# |path |length|content |sub_groups |
# +-----------------------+------+--------------------+-----------------------------------+
# |file:/path/to/file1.7z |123456|[byte array content]|[[v1f1, v1f2,..], [v1f3, v2f1,..]] |
# |file:/path/to/file2.7z |123456|[byte array content]|[[v1f5, v2f4,..], [v2f3, v1f9,..]] |
# +-------------------+---+------+--------------------+-----------------------------------+
df = df.withColumn('sub_group', F.explode(df.sub_groups)).drop('sub_groups')
df.show()
# +-----------------------+------+--------------------+----------------+
# |path |length|content |sub_group |
# +-----------------------+------+--------------------+----------------+
# |file:/path/to/file1.7z |123456|[byte array content]|[v1f1, v1f2,..] |
# |file:/path/to/file1.7z |123456|[byte array content]|[v1f3, v2f1,..] |
# |file:/path/to/file2.7z |123456|[byte array content]|[v1f5, v2f4,..] |
# |file:/path/to/file2.7z |123456|[byte array content]|[v2f3, v1f9,..] |
# +-------------------+---+------+--------------------+----------------+
df = df.withColumn("unzipped_files", unzip_sub_group(df.path, df.sub_group, df.content))
df.write.mode("overwrite").parquet("/mnt/test_dump_unzipped")