pythonzippython-zipfile

Safely extract uploaded ZIP files in Python


I'm working on a python REST API that allows users to upload ZIP files. Before extracting them, I want to protect against common vulnerabilities, especially Zip bombs. Is there a way (ideally based on standard libraries like zipfile) to safely validate and extract ZIP uploads in Python?

I looked into third-party libraries like defusedzip or python-securezip but they seem outdated and not maintained.

I also checked this related Stackoverflow question, but I could not find anything that mentions the protection against Zip bombs.


Solution

  • The only direct detection of Fifield zip bombs that I'm aware of is the one that I wrote for Info-ZIP's unzip. You could try to adapt that code, or run unzip -t externally (with those mods) to check a zip file for bombs before extracting it.

    To write your own detector, you would read the central directory to get the starting offset and length of all of the entries. You would then look for any cases where two entries overlap. If there are any overlaps, then it is not a valid zip file, and it is likely a zip bomb.

    Update:

    Here is Python code to inspect a zip file for zip bombs:

    '''
      beagle.py -- detect zip bombs
      Copyright (C) 2025 Mark Adler
      Version 1.0  5 May 2025  Mark Adler
    
      This software is provided 'as-is', without any express or implied
      warranty.  In no event will the author be held liable for any damages
      arising from the use of this software.
    
      Permission is granted to anyone to use this software for any purpose,
      including commercial applications, and to alter it and redistribute it
      freely, subject to the following restrictions:
    
      1. The origin of this software must not be misrepresented; you must not
         claim that you wrote the original software. If you use this software
         in a product, an acknowledgment in the product documentation would be
         appreciated but is not required.
      2. Altered source versions must be plainly marked as such, and must not be
         misrepresented as being the original software.
      3. This notice may not be removed or altered from any source distribution.
    
      Mark Adler
      madler@alumni.caltech.edu
    '''
    
    # Examine a zip file and determine if it has any overlapping records. If it
    # does, it is an invalid zip file, and is likely a zip bomb. See David
    # Fifield's clever zip bomb construction and examples here:
    #
    #   https://www.bamsoftware.com/hacks/zipbomb/
    
    import sys
    import io
    import os
    import struct
    
    def bomb(zip):
        # If zip is a valid zip file, return True if there are any overlapping
        # records, or False if not. If zip is not a valid zip file, the central
        # directory is compressed and/or encrypted, or the file is from a
        # multiple-disk archive, then return None. zip must be a file object opened
        # in binary mode, and must be seekable.
    
        def is_end(rec, pos):
            # Return a central directory tuple if rec is valid end record data,
            # or False otherwise. pos is the offset in the zip file zip of the
            # end signature (zip is non-local). The returned tuple is the
            # number of entries in the central directory, its length in bytes,
            # and its offset in the zip file. rec is the 18 bytes of the
            # candidate end record that follows an end record signature. Upon
            # finding a valid end record, ends (non-local) is updated with the
            # spans of the end record and Zip64 records (if any), and of the
            # central directory. Each span is the starting offset and one past
            # the ending offset of the record. size (non-local) is the size of
            # the zip file in bytes.
            end = struct.unpack('<HHHHLLH', rec)
            if end[0] != 0 or end[1] != 0 or pos + 22 + end[6] > size:
                # Multiple disks (not supported), or comment past end of file.
                return False
            nonlocal ends
            ends.append((pos, pos + 22 + end[6]))
            end = end[2:6]
            if end[0] == 0xffff or end[1] == 0xffff or \
               end[2] == 0xffffffff or end[3] == 0xffffffff:
                # Get the Zip64 end of central directory locator, which immediately
                # precedes the end record.
                if pos < 20:
                    # Not enough bytes before the end record for the locator.
                    return False
                zip.seek(pos - 20, os.SEEK_SET)
                loc = struct.unpack('<LLQL', zip.read(20))
                if loc[0] != 0x07064b50 or loc[1] != 0 or loc[3] != 1 or \
                   loc[2] + 56 > size:
                    # Invalid locator.
                    return False
                ends.append((pos - 20, pos))
                # Get the Zip64 end of central directory record.
                zip.seek(loc[2], os.SEEK_SET)
                end = struct.unpack('<LQHHLLQQQQ', zip.read(56))
                if end[0] != 0x06064b50 or end[1] < 44 or end[4] != 0 or \
                   end[5] != 0:
                   # Invalid Zip64 end record.
                   return False
                ends.append((loc[2], loc[2] + 12 + end[1]))
                end = end[6:10]
            if end[0] != end[1] or end[3] + end[2] > size:
                # Multiple disks, or central directory would go past end of file.
                return False
            ends.append((end[3], end[3] + end[2]))
            ends.reverse()
            return end[1:]
            # end is_end()
    
        def central():
            # Return a tuple with the offset, length, and number of entries of the
            # central directory in the zip file zip (non-local) with size size
            # (non-local), or False if the requisite end records cannot be found.
            # Search the zip file starting at the end for a valid end record as
            # well as associated Zip64 records if indicated. On success, update
            # ends (non-local) with the spans of the end and Zip64 records and the
            # central directory. The span of the end record includes its comment,
            # and the span of the Zip64 end record includes its extension.
    
            # Search backwards for the first valid end record and associated Zip64
            # records as indicated by the end record. For the first attempt, the
            # far and away most common case is tried, which is the end record in
            # the last 22 bytes of the file. If it's not there, then we start
            # searching backwards with the remainder of the last block of zip
            # starting on an 8K boundary, or the previous 8K if the last 22 bytes
            # happens to start on an 8K boundary. The search continues backwards
            # from there if needed, one 8K block at a time.
            block = 8192                        # must be a power of two
            beg = size
            back = 22
            buf = b''
            i = 1
            while True:
                beg -= back
                if beg < 0:
                    # File has been searched with no valid end record found.
                    return False
                zip.seek(beg, os.SEEK_SET)
                buf = zip.read(back) + buf[:21]
                while i > 0:
                    i -= 1
                    if buf[i] == 0x50 and buf[i + 1] == 0x4b and \
                       buf[i + 2] == 5 and buf[i + 3] == 6:
                        # Found an end record signature -- check the rest.
                        end = is_end(buf[i + 4:i + 22], beg + i)
                        if end:
                            # Good end record.
                            return end
                back = ((beg - 1) & (block - 1)) + 1
                i = back
            # Loop does not exit here.
            # end central()
    
        # bomb() body
    
        # Find the central directory. Add the spans of the end records and the
        # central directory to ends.
        ends = []
        zip.seek(0, os.SEEK_END)
        size = zip.tell()
        dir = central()
        if not dir:
            # Not a zip file, invalid, or unsupported.
            return None
    
        # Read in the central directory and process it. Add the spans of the local
        # entries to spans.
        spans = []
        (num, end, pos) = dir
        zip.seek(pos, os.SEEK_SET)
        cent = zip.read(end)
        i = 0                       # cent is processed sequentially with index i
        while num > 0:
            if i + 46 > end:
                # Header would go past end of directory.
                break
            head = struct.unpack('<LHHHHHHLLLHHHHHLL', cent[i:i + 46])
            i += 46
            if head[0] != 0x02014b50:
                # Not a central directory header signature.
                break
            skip = head[10] + head[11] + head[12]
            if i + skip > end:
                # Header would go past end of directory.
                break
            clen = head[8]
            ulen = head[9]
            disk = head[13]
            off = head[16]
            if clen == 0xffffffff or ulen == 0xffffffff or \
               disk == 0xffff or off == 0xffffffff:
                # Need to get the real deal from the Zip64 extra field.
                good = False
                i += head[10]
                xend = i + head[11]
                while i + 4 <= xend:
                    # Go through each data field until a Zip64 field is found.
                    (id, data) = struct.unpack('<HH', cent[i:i + 4])
                    i += 4
                    if i + data > xend:
                        # Extra data block would go past end of extra field.
                        break
                    dend = i + data
                    if id == 1:
                        # Zip64 extended information extra field.
                        if ulen == 0xffffffff:
                            if i + 8 > dend:
                                # Premature end of Zip64 field.
                                break
                            ulen = struct.unpack('<Q', cent[i:i + 8])[0]
                            i += 8
                        if clen == 0xffffffff:
                            if i + 8 > dend:
                                # Premature end of Zip64 field.
                                break
                            clen = struct.unpack('<Q', cent[i:i + 8])[0]
                            i += 8
                        if off == 0xffffffff:
                            if i + 8 > dend:
                                # Premature end of Zip64 field.
                                break
                            off = struct.unpack('<Q', cent[i:i + 8])[0]
                            i += 8
                        if disk == 0xffff:
                            if i + 4 > dend:
                                # Premature end of Zip64 field.
                                break
                            off = struct.unpack('<L', cent[i:i + 4])[0]
                            i += 4
                        if i != dend:
                            # Zip64 field is the wrong size.
                            break
                        # The Zip64 field was correct, needed values updated.
                        good = True
                        break
                    else:
                        i = dend
                if not good:
                    # Needed a Zip64 field, but it was missing or invalid.
                    break
                i = xend + head[12]
            else:
                i += skip
            if disk != 0:
                # Multiple disks.
                break
            if off + 30 > size:
                # Local header past end of file.
                break
            zip.seek(off, os.SEEK_SET)
            local = struct.unpack('<LHHHHHLLLHH', zip.read(30))
            if local[0] != 0x04034b50:
                # Not a local header signature.
                break
            lend = off + 30 + local[9] + local[10] + clen   # end of local entry
            if lend > size:
                # Local entry past end of file.
                break
            if local[2] & 8 != 0:
                # There is a data descriptor after the compressed data. Determine
                # its length. We need to try all four possibilities, from longest
                # to shortest. Update lend to include the data descriptor.
                crc = head[7]
                zip.seek(lend, os.SEEK_SET)
                desc = zip.read(24)
                d24 = struct.unpack('<LLQQ', desc[:24]) if len(desc) == 24 else ()
                d20 = struct.unpack('<LQQ', desc[:20]) if len(desc) >= 20 else ()
                d16 = struct.unpack('<LLLL', desc[:16]) if len(desc) >= 16 else ()
                d12 = struct.unpack('<LLL', desc[:12]) if len(desc) >= 12 else ()
                if len(desc) == 24 and d24[0] == 0x08074b50 and \
                   d24[1] == crc and d24[2] == clen and d24[3] == ulen:
                    lend += 24
                elif len(desc) >= 20 and \
                     d20[0] == crc and d20[1] == clen and d20[2] == ulen:
                    lend += 20
                elif len(desc) >= 16 and d16[0] == 0x08074b50 and \
                     d16[1] == crc and d16[2] == clen and d16[3] == ulen:
                    lend += 16
                elif len(desc) >= 12 and \
                     d12[0] == crc and d12[1] == clen and d12[2] == ulen:
                    lend += 12
                else:
                    # No valid data descriptor found.
                    break
            spans.append((off, lend))
            num -= 1
        else:
            # Central directory has been processed.
            if i == end:
                # The central directory was the expected length. Look for overlaps
                # of the records. For normal zip files, spans will already be
                # sorted, in which case the Python Timsort will take O(n) time.
                spans += ends
                spans.sort()
                this = spans[0]
                if verbose and this[0] != 0:
                    # The first this[0] bytes of the zip file have no data. This
                    # may be an executable zip file, with a decompressor there,
                    # or there may be a spanning signature at the start. This is
                    # not normal, but is not invalid.
                    print(f'!! {zip.name} has {this[0]} unused '
                          f'byte{"" if this[0] == 1 else "s"} at the start',
                          file=sys.stderr)
                for next in spans[1:]:
                    if this[1] > next[0]:
                        # Overlap! This is a zip bomb or a corrupted zip file.
                        return True
                    elif verbose and this[1] < next[0]:
                        # Underlap. This is not normal, but is not invalid.
                        print(f'!! {zip.name} has {next[0] - this[1]} unused '
                              f'byte{"" if next[0] - this[1] == 1 else "s"} '
                              f'between records',
                              file=sys.stderr)
                    this = next
                if verbose and this[1] != size:
                    # The last size - this[1] bytes of the zip file have no data.
                    # This is not normal, but is not invalid.
                    print(f'!! {zip.name} has {size - this[1]} unused '
                          f'byte{"" if size - this[1] == 1 else "s"} at the end',
                          file=sys.stderr)
                # Good zip file with no overlapping records.
                return False
        # The central directory while loop exited with a break, indicating an
        # error, or the central directory was not the expected length. This is not
        # a valid zip file.
        return None
        # end bomb()
    
    # main body
    
    # Process the zip files on the command line. The -v (verbose) option will show
    # any gaps of unused space between zip file records.
    verbose = False
    for path in sys.argv[1:]:
        if path[0] == '-':
            if path == '-v':
                verbose = True
            else:
                print(f'?? unknown option: {path}', file=sys.stderr)
    for path in sys.argv[1:]:
        if path[0] == '-':
            continue
        with open(path, 'rb') as zip:
            ret = bomb(zip)
            if ret is None:
                print(f'{path} is not a zip file or is invalid or unsupported')
            elif ret:
                print(f'{path} is a zip bomb! ** do not extract **')
            else:
                print(f'{path} is good')