I want to implement a web service with streaming output using FastAPI. During the pressure test, it is found that for the same request, not the same thread is executed all the time. As a result, some intermediate variables stored in threadlocal are disordered. I have implemented a demo and pressure-tested it. May I ask why doesn't the stream return the same thread for the same request?
import logging
import threading
import time
import uvicorn
from fastapi import FastAPI
from sse_starlette import EventSourceResponse
app = FastAPI()
@app.get("/")
def stream_output():
logging.warning(f"{threading.current_thread().ident}")
return EventSourceResponse(num_generator(10), headers={"thread": str(threading.current_thread().ident)})
def num_generator(n):
for i in range(n):
logging.warning(f"{threading.currentThread().ident}: %s" % i)
time.sleep(2)
yield f"thread: {threading.current_thread().ident} num: {i}"
logging.warning(f"{threading.current_thread().ident}: end")
if __name__ == "__main__":
uvicorn.run(app, host='localhost', port=8000)
Pressure test result:
result of one of the requests:
data: thread: 39404 num: 0
data: thread: 39404 num: 1
data: thread: 57624 num: 2
data: thread: 39404 num: 3
data: thread: 52536 num: 4
data: thread: 39404 num: 5
data: thread: 52536 num: 6
data: thread: 39404 num: 7
data: thread: 39404 num: 8
data: thread: 52536 num: 9
I expect same thread for the same request
data: thread: 56052 num: 0
data: thread: 56052 num: 1
data: thread: 56052 num: 2
data: thread: 56052 num: 3
data: thread: 56052 num: 4
data: thread: 56052 num: 5
data: thread: 56052 num: 6
data: thread: 56052 num: 7
data: thread: 56052 num: 8
data: thread: 56052 num: 9
The answer you are looking for should be found, in part, in the source code of the third-party library you are using to send Server Sent Events (SSE); more specifically, in the implementation of the EventSourceResponse
class, which you seem to be returning from the /
endpoint.
Similarly to the official FastAPI/Starlette's StreamingResponse
—please have a look at this answer and this answer for more details and explanation—the reason for the various threads appearing in the logs is that when you pass a synchronous generator to EventSourceResponse
(i.e., a normal def
function instead of async def
), such as the num_generator()
funciton in your example, EventSourceResponse
will execute the def
function in a separate thread from an external threadpool and then await
it, using Starlette's iterate_in_threadpool()
(see the relevant implementation of EventSourceResponse
).
Again, please take a look at the linked answers above for more details.