pythonfastapi

Python fastapi: yield content of streaming response


I have an endpoint that returns fastapi StreamingResponse:

async def stream_results():
    ...
    async for _output in result_generator:
        ...
        yield (json.dumps({'Field': value}, ensure_ascii=False)) + '\n'


return StreamingResponse(stream_results(), media_type='application/x-ndjson')

value - string that incrementaly increases

Now i want to get response from this service and "restream" it:

@app.post('/test')
async def test_api(request: Request):

    async def stream_results():
        with requests.post("http://127.0.0.1:8000/test_api", json={}, stream=True) as r:
            for chunk in r.iter_content(1000):
                print(chunk)
                yield chunk

    return StreamingResponse(stream_results(), media_type='application/x-ndjson')

The issus is: print works as expected and outputs the result as it is generated. But yield does not. It waits for stream_results to finish and then returns response like that instead of gradual streaming result:

{'Field': "a"}
{'Field': "ab"}
{'Field': "abc"}

What am I doing wrong?


Solution

  • You are using requests(which is synchronous) inside async function.By doing so you blocking event loop until all chunks have been read. You could use httpx or aiohttp e.g:

    import httpx
    from fastapi import FastAPI, Request
    from fastapi.responses import StreamingResponse
    app = FastAPI()
    
    @app.post('/test')
    async def test_api(request: Request):
        async def stream_results():
            async with httpx.AsyncClient() as client:
                async with client.stream("POST", "http://127.0.0.1:8000/test_api",json={} ) as response:
                    async for chunk in response.aiter_bytes(chunk_size=1234):
                        yield chunk
    
        return StreamingResponse(stream_results(), media_type='application/x-ndjson')