pythonpython-3.xfastapihttpx

How to get the response.headers along with AsyncIterable content from async httpx.stream to a FastAPI StreamingResponse?


I am trying to use httpx in a FastAPI endpoint to download files from a server and return them as a StreamingResponse. For some processing, I need to get the header information along with the data. I want to stream the file data so I came up with this attempt, boiled down to a MRE:

from typing import AsyncIterable

import httpx
from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()


class FileStream:
    def __init__(self, headers: dict[str, str], stream: AsyncIterable[bytes]):
        self.headers = headers
        self.stream = stream


async def get_file_stream(url) -> FileStream:
    async with httpx.AsyncClient() as client:
        async with client.stream("GET", url) as response:

            async def chunk_generator() -> AsyncIterable[bytes]:
                async for chunk in response.aiter_bytes():
                    yield chunk

            return FileStream(response.headers, chunk_generator())


@app.get("/download")
async def download_file():
    file_stream = await get_file_stream(url=some_url)

    headers = {}
    media_type = "application/octet-stream"
    # some code setting headers and media_type based on file_stream.headers

    return StreamingResponse(file_stream.stream, media_type=media_type, headers=headers)


if __name__ == "__main__":
    import uvicorn

    uvicorn.run(app, host="127.0.0.1", port=8000, log_level="debug")

This leads to the Error httpx.StreamClosed: Attempted to read or stream content, but the stream has been closed.. To my understanding, this is because of the scoping of the context managers in get_file_stream, so I tried to solve it with a wrapping context manager:

from contextlib import asynccontextmanager
from typing import AsyncIterable

import httpx
from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()


class FileStream:
    def __init__(self, headers: dict[str, str], stream: AsyncIterable[bytes]):
        self.headers = headers
        self.stream = stream

@asynccontextmanager
async def get_file_stream(url) -> FileStream:
    async with httpx.AsyncClient() as client:
        async with client.stream("GET", url) as response:

            async def chunk_generator() -> AsyncIterable[bytes]:
                async for chunk in response.aiter_bytes():
                    yield chunk

            yield FileStream(response.headers, chunk_generator())


@app.get("/download")
async def download_file():
    async with get_file_stream(url=some_url) as file_stream:

        headers = {}
        media_type = "application/octet-stream"
        # some code setting headers and media_type based on file_stream.headers

        return StreamingResponse(file_stream.stream, media_type=media_type, headers=headers)

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="127.0.0.1", port=8000, log_level="debug")

However, this leads to the same issue. It seems that I am missing some points here. Any hints on how to solve this?

Update The problem seems to be with how FastAPI handles StreamingResponses. It indeed closes the resources before starting the response streaming. I am trying to find a solid workaround.

Still same for fastapi 0.116.1


Solution

  • The issue is context management. Try wrapping your httpx call within the generator object. This way the stream should stay open while you get data.

    Relevant Code Updates:

    async def get_file_stream(url: str) -> FileStream:
        async def chunk_generator() -> AsyncIterable[bytes]:
            async with httpx.AsyncClient() as client:
                async with client.stream("GET", url) as response:
                    async for chunk in response.aiter_bytes():
                        yield chunk
        return FileStream({}, chunk_generator())
    
    
    @app.get("/download")
    async def download_file():
    
        file_stream = await get_file_stream(some_url)
    
        headers = {}
        media_type = "application/octet-stream"
    
        return StreamingResponse(
            file_stream.stream,
            media_type=media_type,
            headers=headers,
        )