pythonmultithreadingfastapigeneratorstarlette

Why does thread disorder occur when the FastAPI multithread return generator?


I want to implement a web service with streaming output using FastAPI. During the pressure test, it is found that for the same request, not the same thread is executed all the time. As a result, some intermediate variables stored in threadlocal are disordered. I have implemented a demo and pressure-tested it. May I ask why doesn't the stream return the same thread for the same request?

import logging
import threading
import time

import uvicorn
from fastapi import FastAPI
from sse_starlette import EventSourceResponse

app = FastAPI()


@app.get("/")
def stream_output():
    logging.warning(f"{threading.current_thread().ident}")
    return EventSourceResponse(num_generator(10), headers={"thread": str(threading.current_thread().ident)})


def num_generator(n):
    for i in range(n):
        logging.warning(f"{threading.currentThread().ident}: %s" % i)
        time.sleep(2)
        yield f"thread: {threading.current_thread().ident} num: {i}"
    logging.warning(f"{threading.current_thread().ident}: end")


if __name__ == "__main__":
    uvicorn.run(app, host='localhost', port=8000)

Pressure test result:

enter image description here

result of one of the requests:

data: thread: 39404 num: 0  
  
data: thread: 39404 num: 1  
  
data: thread: 57624 num: 2  
  
data: thread: 39404 num: 3  
  
data: thread: 52536 num: 4  
  
data: thread: 39404 num: 5  
  
data: thread: 52536 num: 6  
  
data: thread: 39404 num: 7  
  
data: thread: 39404 num: 8  
  
data: thread: 52536 num: 9

I expect same thread for the same request

data: thread: 56052 num: 0

data: thread: 56052 num: 1

data: thread: 56052 num: 2

data: thread: 56052 num: 3

data: thread: 56052 num: 4

data: thread: 56052 num: 5

data: thread: 56052 num: 6

data: thread: 56052 num: 7

data: thread: 56052 num: 8

data: thread: 56052 num: 9

Solution

  • The answer you are looking for should be found, in part, in the source code of the third-party library you are using to send SSE; more specifically, in the implementation of the EventSourceResponse class, which you seem to be returning from the / endpoint.

    Similarly to the official FastAPI/Starlette's StreamingResponse —please have a look at this answer and this answer for more details and explanation—the reason for the various threads appearing in the logs is that when you pass a synchronous generator to EventSourceResponse (i.e., a normal def function instead of async def), such as the num_generator() funciton in your example, EventSourceResponse will execute the def function in a separate thread from an external threadpool, using Starlette's iterate_in_threadpool() (see the relevant implementation of EventSourceResponse).

    Again, please take a look at the linked answers above for more details.