I have an endpoint that returns fastapi StreamingResponse:
async def stream_results():
...
async for _output in result_generator:
...
yield (json.dumps({'Field': value}, ensure_ascii=False)) + '\n'
return StreamingResponse(stream_results(), media_type='application/x-ndjson')
value - string that incrementaly increases
Now i want to get response from this service and "restream" it:
@app.post('/test')
async def test_api(request: Request):
async def stream_results():
with requests.post("http://127.0.0.1:8000/test_api", json={}, stream=True) as r:
for chunk in r.iter_content(1000):
print(chunk)
yield chunk
return StreamingResponse(stream_results(), media_type='application/x-ndjson')
The issus is: print works as expected and outputs the result as it is generated. But yield does not. It waits for stream_results to finish and then returns response like that instead of gradual streaming result:
{'Field': "a"}
{'Field': "ab"}
{'Field': "abc"}
What am I doing wrong?
You are using requests(which is synchronous) inside async function.By doing so you blocking event loop until all chunks have been read. You could use httpx or aiohttp e.g:
import httpx
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
app = FastAPI()
@app.post('/test')
async def test_api(request: Request):
async def stream_results():
async with httpx.AsyncClient() as client:
async with client.stream("POST", "http://127.0.0.1:8000/test_api",json={} ) as response:
async for chunk in response.aiter_bytes(chunk_size=1234):
yield chunk
return StreamingResponse(stream_results(), media_type='application/x-ndjson')