pythonpysparkdatabricks7zippy7zr

Memory problem when serializing zipped files in PySpark on Databricks


I want to unzip many files in 7z format in PySpark on Databricks. The zip files contain several thousand tiny files.

I read the files using binary File and I use a UDF to unzip the files:

schema = ArrayType(StringType())

@F.udf(returnType=schema)
def unzip_content_udf(content):
    extracted_file_contents= []

    with py7zr.SevenZipFile(io.BytesIO(content), mode='r') as z:
        for name, bytes_stream in z.readall().items():
            if name.startswith("v1") or name.startswith("v2"):
                unzipped_content = bytes_stream.read().decode(ENCODING)
                extracted_file_contents.append(unzipped_content)
    return extracted_file_contents

df = spark.read.format("binaryFile").load("/mnt/file_pattern*")
df = df.withColumn("unzipped_files", unzip_content_udf(F.col("content")))
df.write.mode("overwrite").parquet("/mnt/test_dump_unzipped")

This works well for smaller files, but if I specify one of the larger files (150 MB zipped, 4.5GB unzipped) the process dies and I get:

Py4JJavaError: An error occurred while calling o1665.parquet.
ValueError: can not serialize object larger than 2G

I guess, it makes sense since the serialization limit is smaller than the unzipped file size.

Do you have any ideas on how to either increase the limit or chunk the size of the unzip operation below the limit?


Solution

  • Typical stackoverflow answer would be: You're doing it wrong.

    This seems like a misuse of Spark as you're not really using spark features. You're mostly using it to distribute unzipping across multiple nodes of a cluster. E.g. you could've used dispy instead.

    can not serialize object larger than 2G

    IMO it's very reasonable to say if a single row in your Dataframe/table is more than 2GB, then you have some data modelling issues.

    Primary problems in trying to do this using a udf are:

    Java byte-Array has max capacity of 2GB. I don't know if this is the reason for Spark's limitation, but what it means is that you might not be able to throw more money/hardware at this problem and change some spark config to serialize 4.5 GB, i.e. run the code you posted as is on bigger hardware.

    Overall, IMO you're running into the issues because you're using wrong tool (spark) to do this.


    Options:

    1. If you're really not in a hurry (performance) then just use a ThreadPoolExecutor or something and unzip all files using simple multi-threaded-python code. Catch is it doesn't scale horizontally.
    2. If you have zillions of files and petabytes of data and option 1 would take years:
      • Write simple python program to list all files and then issue ssh commands to bunch of worker nodes to unzip the files in parallel across a cluster. Listing can be done in parallel (ThreadPoolExecutor).
      • Use something like dispy instead, for unzipping files. Redistribute unzipped files in groups smaller than 2GB. Then use Spark to read the redistributed-unzipped-files and write back as parquet. There are other frameworks if you like.
    3. Using udf. Make it a 2 pass process. First pass to create sub-groups of inner files and second to actually decode.
    
    @F.udf(returnType=ArrayType(StringType()))
    def create_sub_groups(zip_path):
      # parse metadata of zip_path to get list of files and their sizes inside it. No content decoding.
      # create sub groups of inner files such that each group's total size is less than 2GB (or a few MB IMO)
      # return sub-groups
    
    @F.udf(ArrayType(StringType()))
    def unzip_sub_group(...):
      # similar to unzip_content_udf, but only decodes files in sub-group
    
    
    df = spark.read.format("binaryFile").load("/mnt/file_pattern*")
    
    df.select('path', 'length', 'content').show()
    # +-----------------------+------+--------------------+
    # |path                   |length|content             |
    # +-----------------------+------+--------------------+
    # |file:/path/to/file1.7z |123456|[byte array content]|
    # |file:/path/to/file2.7z |123456|[byte array content]|
    # +-------------------+---+------+--------------------+
    
    df = df.withColumn('sub_groups', create_sub_groups(df.path))
    df.show()
    # +-----------------------+------+--------------------+-----------------------------------+
    # |path                   |length|content             |sub_groups                         |
    # +-----------------------+------+--------------------+-----------------------------------+
    # |file:/path/to/file1.7z |123456|[byte array content]|[[v1f1, v1f2,..], [v1f3, v2f1,..]] |
    # |file:/path/to/file2.7z |123456|[byte array content]|[[v1f5, v2f4,..], [v2f3, v1f9,..]] |
    # +-------------------+---+------+--------------------+-----------------------------------+
     
    df = df.withColumn('sub_group', F.explode(df.sub_groups)).drop('sub_groups')
    df.show()
    # +-----------------------+------+--------------------+----------------+
    # |path                   |length|content             |sub_group       |
    # +-----------------------+------+--------------------+----------------+
    # |file:/path/to/file1.7z |123456|[byte array content]|[v1f1, v1f2,..] |
    # |file:/path/to/file1.7z |123456|[byte array content]|[v1f3, v2f1,..] |
    # |file:/path/to/file2.7z |123456|[byte array content]|[v1f5, v2f4,..] |
    # |file:/path/to/file2.7z |123456|[byte array content]|[v2f3, v1f9,..] |
    # +-------------------+---+------+--------------------+----------------+
    
    df = df.withColumn("unzipped_files", unzip_sub_group(df.path, df.sub_group, df.content))
    
    df.write.mode("overwrite").parquet("/mnt/test_dump_unzipped")