
Processing requests in FastAPI sequentially while staying responsive

My server exposes an API for a resource-intensive rendering work. The job it does involves a GPU and as such the server can handle only a single request at a time. Clients should submit a job and receive 201 - ACCEPTED - as a response immediately after. The processing can take up to a minute and there can be a few dozens of requests scheduled.

Here's what I came up with, boiled to a minimal reproducible example:

import time
import asyncio
from fastapi import FastAPI, status

app = FastAPI()
fifo_queue = asyncio.Queue()

async def process_requests():
    while True:
        name = await fifo_queue.get()  # Wait for a request from the queue
        fifo_queue.task_done()  # Indicate that the request has been processed

async def startup_event():
    asyncio.create_task(process_requests())  # Start the request processing task

async def render(name):
    fifo_queue.put_nowait(name)  # Add the request parameter to the queue
    return status.HTTP_201_CREATED  # Return a 201 status code

The problem with this approach is that the server does not stay responsive. After sending the first request it gets busy full time with it and does not respond as I have hoped.


In this example simply replacing time.sleep(10) with await asyncio.sleep(10) solves the problem, but not in the real use case (though possibly offers a clue as for what I am doing incorrectly).

Any ideas?


  • As you may have figured out already, the main issue in your example is that you run a synchronous blocking operation within an async def endpoint, which blocks the event loop (of the main thread), and hence, the entire server. As explained in this answer, if one has to use an async def endpoint, they could run such CPU-bound tasks in an external ProcessPool and then await it (using asyncio's loop.run_in_executor()), which would return control back to the event loop, thus allowing other tasks in the event loop to run, until that task is completed—please have a look at the linked answer above, as well as this answer for more details. As explained in the linked answers, when using ProcessPoolExecutor on Windows, it is important to protect the entry point of the program to avoid recursive spawning of subprocesses, etc. Basically, your code must be under if __name__ == '__main__' (as shown in the example below).

    I would also suggest using a lifespan handler, as demonstrated in this answer and this answer, instead of the deprecated startup and shutdown event handlers, to start the process_requests function, as well as instantiate the asyncio.Queue() and the ProcessPoolExecutor, and then add them to the request.state, so that they can be shared and re-used by every request/endpoint (especially, in the case of ProcessPoolExecutor, to avoid creating a new ProcessPool every time, as the computational costs for setting up processes can become expensive, when creating and destroying a lot of processes over and over).

    Further, I would suggest creating a unique ID for every request arrived, and return that ID to the client, so that they can use it to check on the status of their request, i.e., whether is completed or still pending processing. You could save that ID to your database storage (or a Key-Value store, such as Redis), as explained in this answer; however, for simplicity and demo purposes, the example belows uses a dict object for that purpose. It should also be noted that in the example below the ID is expected as a query parameter to the /status endpoint, but in real-world scenarios, you should never pass sensitive information to the query string, as this would pose security/privacy risks (see Solution 1 of this answer, where some of the risks are outlined). You should instead pass sensitive information to the request body, and always use the HTTPS protocol.

    Working Example

    from fastapi import FastAPI, Request
    from fastapi.responses import JSONResponse
    from contextlib import asynccontextmanager
    from dataclasses import dataclass
    from concurrent.futures import ProcessPoolExecutor
    import time
    import asyncio
    import uuid
    class Item:
        id: str
        name: str
    # Simulating a Computationally Intensive Task
    def cpu_bound_task(item: Item):
        print(f"Processing: {}")
        return 'ok'
    async def process_requests(q: asyncio.Queue, pool: ProcessPoolExecutor):
        while True:
            item = await q.get()  # Get a request from the queue
            loop = asyncio.get_running_loop()
            fake_db[] = 'Processing...'
            r = await loop.run_in_executor(pool, cpu_bound_task, item)
            q.task_done()  # tell the queue that the processing on the task is completed
            fake_db[] = 'Done.'
    async def lifespan(app: FastAPI):
        q = asyncio.Queue()  # note that asyncio.Queue() is not thread safe
        pool = ProcessPoolExecutor()
        asyncio.create_task(process_requests(q, pool))  # Start the requests processing task
        yield {'q': q, 'pool': pool}
        pool.shutdown()  # free any resources that the pool is using when the currently pending futures are done executing
    fake_db = {}
    app = FastAPI(lifespan=lifespan)
    async def add_task(request: Request, name: str):
        item_id = str(uuid.uuid4())
        item = Item(item_id, name)
        request.state.q.put_nowait(item)  # Add request to the queue
        fake_db[item_id] = 'Pending...'
        return item_id
    async def check_status(item_id: str):
        if item_id in fake_db:
            return {'status': fake_db[item_id]}
            return JSONResponse("Item ID Not Found", status_code=404)
    if __name__ == '__main__':
        import uvicorn


    In case you encountered a memory leak (i.e., memory that is no longer needed, but is not released), when re-using a ProcessPoolExecutor—for any reason, e.g., likely due to issues with some third-party library that you might be using—you could instead create a new instance of the ProcessPoolExecutor class for every request that needs to be processed and have it terminated (using the with statement) right after the processing is completed. Note, however, that creating and destroying many processes over and over could become computationally expensive. Example:

    async def process_requests(q: asyncio.Queue):
        while True:
            # ...
            with ProcessPoolExecutor() as pool:
                r = await loop.run_in_executor(pool, cpu_bound_task, item)
            # ...