pythonzipstreamingfastapiyield

Is it possible to yield one compressed file at a time in FastAPI into a single archive using StreamingResponse?


I have code that creates a zip archive of files and streams it. The problem is, for large requests, this means there can be minutes of processing time before the data is streamed, making cancelling requests problematic, since the Python will continue to run. Ideally I would yield each compressed file, one a time through a generator function, and the user receive the whole zip archive still, thus making cancellation of requests more robust.

I have a minimal working example. If the function yields a single file (N=1), it works fine. If you want to yield 2 or more (N>=2), the zip is corrupt. Here is a minimal working example:

# test endpoint
from fastapi import APIRouter, Path
from numpy import random
import io, zipfile
from fastapi.responses import StreamingResponse

router = APIRouter(tags=["Make files"])

# make some fake data and zip, then yield 
def make_data(N):

    """
    Make fake data.
    """    

    CHUNK_SIZE = 1024*1024

    for n in range(N):

        content = random.random(100)
        name = f'{n:02}.txt'

        # Create new in-memory zip file for each file
        s = io.BytesIO()
        with zipfile.ZipFile(s, "w", compression=zipfile.ZIP_DEFLATED, compresslevel=2) as zf:
            # Add file content to the in-memory zip file
            zf.writestr(name, content)

        # Seek to the beginning of the in-memory zip file
        s.seek(0)

        # Yield the content of the in-memory zip file for the current file
        while chunk := s.read(CHUNK_SIZE):
            yield chunk

# streamingresponse
def stream_data(N):

    """
    Stream the files.
    """

    return StreamingResponse(
        make_data(N), media_type="application/zip",
        headers={"Content-Disposition": f"attachment; filename=download.zip"})

# endpoint
@router.get("/{N}")
async def yield_files(
    N: int = Path(..., decription="random files to make")):

    return stream_data(N)

Is there a simple change that could be made to allow this script to work?


Solution

  • I found the solution is to use the stream-zip package. The docs are here and are extremely well written. There is an async interface too. Overall time from processing to user getting the zip is slower I find, but worth it for the benefits.

    But to be thorough, in my question, we would do something like (using the async interface):

    from fastapi import APIRouter, Path
    from fastapi.responses import StreamingResponse
    from numpy import random
    from stream_zip import async_stream_zip, ZIP_32
    from datetime import datetime
    from stat import S_IFREG
    import asyncio
    
    router = APIRouter(tags=["Make files"])
    
    # fake data
    async def async_data(n):
        content = random.random(100).tobytes()
        yield content
    
    # Async generator
    async def async_member_files(N: int):
        for n in range(N):
            yield (
                f'{n:02}.txt',          # filename
                datetime.now(),          # modification date
                S_IFREG | 0o600,        # permissions
                ZIP_32,                  # ZIP version
                async_data(n)           # async iterable of bytes
            )
    
    # endpoint
    @router.get("/{N}")
    async def yield_files(N: int = Path(..., description="random files to make")):
        return StreamingResponse(
            async_stream_zip(async_member_files(N)),
            media_type="application/zip",
            headers={"Content-Disposition": "attachment; filename=download.zip"}
        )