I have code that creates a zip archive of files and streams it. The problem is, for large requests, this means there can be minutes of processing time before the data is streamed, making cancelling requests problematic, since the Python will continue to run. Ideally I would yield each compressed file, one a time through a generator function, and the user receive the whole zip archive still, thus making cancellation of requests more robust.
I have a minimal working example. If the function yields a single file (N=1), it works fine. If you want to yield 2 or more (N>=2), the zip is corrupt. Here is a minimal working example:
# test endpoint
from fastapi import APIRouter, Path
from numpy import random
import io, zipfile
from fastapi.responses import StreamingResponse
router = APIRouter(tags=["Make files"])
# make some fake data and zip, then yield
def make_data(N):
"""
Make fake data.
"""
CHUNK_SIZE = 1024*1024
for n in range(N):
content = random.random(100)
name = f'{n:02}.txt'
# Create new in-memory zip file for each file
s = io.BytesIO()
with zipfile.ZipFile(s, "w", compression=zipfile.ZIP_DEFLATED, compresslevel=2) as zf:
# Add file content to the in-memory zip file
zf.writestr(name, content)
# Seek to the beginning of the in-memory zip file
s.seek(0)
# Yield the content of the in-memory zip file for the current file
while chunk := s.read(CHUNK_SIZE):
yield chunk
# streamingresponse
def stream_data(N):
"""
Stream the files.
"""
return StreamingResponse(
make_data(N), media_type="application/zip",
headers={"Content-Disposition": f"attachment; filename=download.zip"})
# endpoint
@router.get("/{N}")
async def yield_files(
N: int = Path(..., decription="random files to make")):
return stream_data(N)
Is there a simple change that could be made to allow this script to work?
I found the solution is to use the stream-zip package. The docs are here and are extremely well written. There is an async interface too. Overall time from processing to user getting the zip is slower I find, but worth it for the benefits.
But to be thorough, in my question, we would do something like (using the async interface):
from fastapi import APIRouter, Path
from fastapi.responses import StreamingResponse
from numpy import random
from stream_zip import async_stream_zip, ZIP_32
from datetime import datetime
from stat import S_IFREG
import asyncio
router = APIRouter(tags=["Make files"])
# fake data
async def async_data(n):
content = random.random(100).tobytes()
yield content
# Async generator
async def async_member_files(N: int):
for n in range(N):
yield (
f'{n:02}.txt', # filename
datetime.now(), # modification date
S_IFREG | 0o600, # permissions
ZIP_32, # ZIP version
async_data(n) # async iterable of bytes
)
# endpoint
@router.get("/{N}")
async def yield_files(N: int = Path(..., description="random files to make")):
return StreamingResponse(
async_stream_zip(async_member_files(N)),
media_type="application/zip",
headers={"Content-Disposition": "attachment; filename=download.zip"}
)