pythonpython-asynciofastapistarletteapachebench

FastAPI: Performance results differ between run_in_threadpool() and run_in_executor() with ThreadPoolExecutor


Here's a minimal reproducible example of my FastAPI app. I have a strange behavior and I'm not sure I understand the reason.

I'm using ApacheBench (ab) to send multiple requests as follows:

ab -n 1000 -c 50 -H 'accept: application/json' -H 'x-data-origin: source' 'http://localhost:8001/test/async'

FastAPI app

import time
import asyncio
import enum
from typing import Any

from fastapi import FastAPI, Path, Body
from starlette.concurrency import run_in_threadpool

app = FastAPI()
loop = asyncio.get_running_loop()
def sync_func() -> None:
    time.sleep(3)
    print("sync func")

async def sync_async_with_fastapi_thread() -> None:
    await run_in_threadpool( time.sleep, 3)
    print("sync async with fastapi thread")

async def sync_async_func() -> None:
    await loop.run_in_executor(None, time.sleep, 3)

async def async_func() -> Any:
    await asyncio.sleep(3)
    print("async func")

@app.get("/test/sync")
def test_sync() -> None:
    sync_func()
    print("sync")

@app.get("/test/async")
async def test_async() -> None:
    await async_func()
    print("async")

@app.get("/test/sync_async")
async def test_sync_async() -> None:
    await sync_async_func()
    print("sync async")

@app.get("/test/sync_async_fastapi")
async def test_sync_async_with_fastapi_thread() -> None:
    await sync_async_with_fastapi_thread()
    print("sync async with fastapi thread")

Here's the ApacheBench results:

async with (asyncio.sleep) : *Concurrency Level: 50

sync (with time.sleep): Concurrency Level: 50

sync_async (time sleep with run_in_executor) : *Concurrency Level: 50

sync_async_fastapi (time sleep with run_in threadpool): *Concurrency Level: 50

In conclusion, I'm experiencing a surprising disparity in results, especially when using run_in_executor, where I'm encountering significantly higher average times (12 seconds). I don't understand this outcome.

--- EDIT --- After AKX answer.

Here the code working as expected: 
import time
import asyncio
from anyio import to_thread

to_thread.current_default_thread_limiter().total_tokens = 200
loop = asyncio.get_running_loop()
executor = ThreadPoolExecutor(max_workers=100)
def sync_func() -> None:
    time.sleep(3)
    print("sync func")

async def sync_async_with_fastapi_thread() -> None:
    await run_in_threadpool( time.sleep, 3)
    print("sync async with fastapi thread")

async def sync_async_func() -> None:
    await loop.run_in_executor(executor, time.sleep, 3)

async def async_func() -> Any:
    await asyncio.sleep(3)
    print("async func")

@app.get("/test/sync")
def test_sync() -> None:
    sync_func()
    print("sync")

@app.get("/test/async")
async def test_async() -> None:
    await async_func()
    print("async")

@app.get("/test/sync_async")
async def test_sync_async() -> None:
    await sync_async_func()
    print("sync async")

@app.get("/test/sync_async_fastapi")
async def test_sync_async_with_fastapi_thread() -> None:
    await sync_async_with_fastapi_thread()
    print("sync async with fastapi thread")

Solution

  • Using run_in_threadpool()

    FastAPI is fully compatible with (and based on) Starlette, and hence, with FastAPI you get all of Starlette's features, such as the run_in_threadpool() method. Starlette's run_in_threadpool(), which uses anyio.to_thread.run_sync() behind the scenes, "will run the sync blocking function in a separate thread to ensure that the main thread (where coroutines are run) does not get blocked"—see this answer and AnyIO's Working with threads documentation for more details. Calling run_in_threadpool()—which internally calls anyio.to_thread.run_sync(), and subsequently, AsyncIOBackend.run_sync_in_worker_thread()—will return a coroutine that will then be awaited to get the eventual result of the sync function (e.g., result = await run_in_threadpool(...)), and hence, FastAPI will still work asynchronously (instead of calling that synchronous function directly, which would block the event loop that runs in the main thread, and hence, the main thread would get blocked as well). As can be seen in Starlette's source code (link is given above), the run_in_threadpool() function simply looks like this (supporting both sequence and keyword arguments):

    async def run_in_threadpool(
        func: typing.Callable[P, T], *args: P.args, **kwargs: P.kwargs
    ) -> T:
        if kwargs:  # pragma: no cover
            # run_sync doesn't accept 'kwargs', so bind them in here
            func = functools.partial(func, **kwargs)
        return await anyio.to_thread.run_sync(func, *args)
    

    As described in AnyIO's documentation:

    Adjusting the default maximum worker thread count

    The default AnyIO worker thread limiter has a value of 40, meaning that any calls to to_thread.run_sync() without an explicit limiter argument will cause a maximum of 40 threads to be spawned. You can adjust this limit like this:

    from anyio import to_thread
    
    async def foo():
        # Set the maximum number of worker threads to 60
        to_thread.current_default_thread_limiter().total_tokens = 60
    

    Note

    AnyIO’s default thread pool limiter does not affect the default thread pool executor on asyncio.

    Since FastAPI uses Startlette's concurrency module to run blocking functions in an external threadpool (the same threadpool is also used by FastAPI to run endpoints defined with normal def instead of async def, as described in this answer), the default value of the thread limiter, as shown above, is applied here as well, i.e., 40 threads maximum—see the relevant AsyncIOBackend.current_default_thread_limiter() method that returns the CapacityLimiter with the default number of threads. Hence, sending 50 requests simultaneously, as in your case, would lead to threadpool starvation, meaning that there wouldn't be enough threads available in the threadpool to handle all incoming requests concurrently.

    As described earlier, one can adjust that value, thus increasing the number of threads, which might lead to an improvement in performance results—always depending on the number of requests to def endpoints (or async def endpoints that create calls to run_in_threadpool() inside) your API is expected to serve concurrently. For instance, if you expect the API to serve no more than 50 requests at a time to such endpoints, then set the maximum number of threads to 50. Note: If your FastAPI application also uses synchronous/blocking background tasks and/or StreamingResponse's generators (i.e., functions defined with normal def instead of async def) and/or UploadFile's async methods, such as read/write/etc. (all these methods call the corresponding synchronous def file methods underneath, using run_in_threadpool()), you could then increase the number of threads as required, as FastAPI actually runs all those functions in the same external threadpool as well—it is all explained in this answer in details.

    Note that using the approach below, which was described here, would have the same effect on adjusting the number of worker threads:

    from anyio.lowlevel import RunVar
    from anyio import CapacityLimiter
    
    RunVar("_default_thread_limiter").set(CapacityLimiter(60))
    

    But, it would be best to follow the approach provided by AnyIO's official documentation (as shown earlier). It is also a good idea to have this done when the application starts up, using a lifespan event handler, as demonstrated here.

    In the working example below, since the /sync endpoint is defined with normal def instead of async def, FastAPI will run it in a separate thread from the external threadpool and await it, thus ensuring the event loop (and hence, the main thread and the entire server) does not get blocked due to the blocking operations (either blocking IO-bound or CPU-bound) that will be performed inside that endpoint.

    Working Example 1

    from fastapi import FastAPI
    from contextlib import asynccontextmanager
    from anyio import to_thread
    import time
    
    
    @asynccontextmanager
    async def lifespan(app: FastAPI):    
        to_thread.current_default_thread_limiter().total_tokens = 60
        yield
    
    
    app = FastAPI(lifespan=lifespan)
    
    
    @app.get("/sync")
    def test_sync() -> None:
        time.sleep(3)
        print("sync")
    
    
    @app.get('/get_available_threads')
    async def get_available_threads():
        return to_thread.current_default_thread_limiter().available_tokens
    

    Using ApacheBench, you could test the example above as follows, which will send 1000 requests in total with 50 being sent simultaneously at a time (-n: Number of requests, -c : Number of concurrent requests):

    ab -n 1000 -c 50 "http://localhost:8000/sync"
    

    While running a performance test on the example above, if you call the /get_available_threads endpoint from your browser, e.g., http://localhost:8000/get_available_threads, you would see that the amount of threads available is always 10 or above (since only 50 threads are used at a time in this test, but the thread limiter was set to 60), meaning that setting the maximum number of threads on AnyIO's thread limiter to a number that is well above your needs, like 200 as shown in some other answer and in your recent example, wouldn't bring about any improvements in the performance; on the contrary, you would end up with a number of threads "sitting" there without being used. As explained earlier, the number of maximum threads should depend on (1) the number of requests your API is expected to serve concurrently (i.e., number of calls to def endpoints, or async def endpoints that call run_in_threadpool() inside), (2) any additional blocking tasks/functions that would run in the threadpool by FastAPI itself under the hood, as well as (3) the server machine's resources available.

    The example below is the same as the one above, but instead of letting FastAPI itself to handle the blocking operation(s) inside the def endpoint (by running the def endpoint in the external threadpool and awaiting it), the endpoint is now defined with async def (meaning that FastAPI will run it directly in the event loop), and inside the endpoint, run_in_threadpool() (which returns an awaitable) is used to run the blocking operation (i.e., time.sleep() in the example). Performing a benchmark test on the example below would yield similar results to the previous example.

    Working Example 2

    from fastapi import FastAPI
    from fastapi.concurrency import run_in_threadpool
    from contextlib import asynccontextmanager
    from anyio import to_thread
    import time
    
    
    @asynccontextmanager
    async def lifespan(app: FastAPI):    
        to_thread.current_default_thread_limiter().total_tokens = 60
        yield
    
    
    app = FastAPI(lifespan=lifespan)
    
    
    @app.get("/sync_async_run_in_tp")
    async def test_sync_async_with_run_in_threadpool() -> None:
        await run_in_threadpool(time.sleep, 3)
        print("sync_async using FastAPI's run_in_threadpool")
    
    
    @app.get('/get_available_threads')
    async def get_available_threads():
        return to_thread.current_default_thread_limiter().available_tokens
    

    Using ApacheBench, you could test the example above as follows:

    ab -n 1000 -c 50 "http://localhost:8000/sync_async_run_in_tp"
    

    Using loop.run_in_executor() with ThreadPoolExecutor

    When using asyncio's loop.run_in_executor()—after obtaining the running event loop using asyncio.get_running_loop()—one could pass None to the executor argument, which would lead to the default executor being used; that is, a ThreadPoolExecutor. Note that when calling loop.run_in_executor() and passing None to the executor argument, this does not create a new instance of a ThreadPoolExecutor every time you do that; instead, a ThreadPoolExecutor is only initialised once the first time you do that, but for subsequent calls to loop.run_in_executor() with passing None to the executor argument, Python reuses that very same instance of ThreadPoolExecutor (hence, the default executor). This can been seen in the source code of loop.run_in_executor(). That means, the number of threads that can be created, when calling await loop.run_in_executor(None, ...), is limited to the default number of thread workers in the ThreadPoolExecutor class.

    As described in the documentation of ThreadPoolExecutor—and as shown in its implementation here—by default, the max_workers argument is set to None, in which case, the number of worker threads is set based on the following equation: min(32, os.cpu_count() + 4). The os.cpu_count() function reutrns the number of logical CPUs in the current system. As explained in this article, physical cores refers to the number of CPU cores provided in the hardware (e.g., the chips), while logical cores is the number of CPU cores after hyperthreading is taken into account. If, for instance, your machine has 4 physical cores, each with hyperthreading (most modern CPUs have this), then Python will see 8 CPUs and will allocate 12 threads (8 CPUs + 4) to the pool by default (Python limits the number of threads to 32 to "avoid consuming surprisingly large resources on multi-core machines"; however, one could always adjust the max_workers argument on their own when using a custom ThreadPoolExecutor, instead of using the default one). You could check the default number of worker threads on your system as follows:

    import concurrent.futures
    
    # create a thread pool with the default number of worker threads
    pool = concurrent.futures.ThreadPoolExecutor()
    
    # report the number of worker threads chosen by default
    # Note: `_max_workers` is a protected variable and may change in the future
    print(pool._max_workers)
    

    Now, as shown in your original example, you are not using a custom ThreadPoolExecutor, but instead using the default ThreadPoolExecutor every time a request arrives, by calling await loop.run_in_executor(None, time.sleep, 3) (inside the sync_async_func() function, which is triggered by the /test/sync_async endpoint). Assuming your machine has 4 physical cores with hyperthreading enabled (as explained in the example earlier), then the default number of worker threads for the default ThreadPoolExecutor would be 12. That means, based on your original example and the /test/sync_async endpoint that triggers the await loop.run_in_executor(None, time.sleep, 3) function, your application could only handle 12 concurrent requests at a time. That is the main reason for the difference observed in the performance results when compared to using run_in_threadpool(), which comes with 40 allocated threads by default. Even though, in both cases, a threadpool starvation was caused when sending 50 requests simultaneously, the endpoint (in your example) that uses run_in_threadpool() performed better only because the default number of threads created was greater than the one used by the default ThreadPoolExecutor (in your other endpoint).

    One way to solve this is to create a new instance of ThreadPoolExecutor (on your own, instead of using the default executor) every time a request arrives and have it terminated once the task is completed (using the with statement), as shown below:

    import concurrent.futures
    import asyncio
    
    loop = asyncio.get_running_loop()
    with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
        await loop.run_in_executor(pool, time.sleep, 3)
    

    Although the above should wok just fine, it would be best to instantiate a ThreadPoolExecutor once at application startup, adjust the number of worker threads as needed, and reuse the executor when required. Having said that, depending on the blocking task and/or external libraries you might be using for that task, if you ever encounter a memory leak—i.e., memory that is no longer needed, but is not released—after tasks are completed when reusing a ThreadPoolExecutor, you might find creating a new instance of ThreadPoolExecutor each time, as shown above, more suitable. Note, however, that if this was a ProcessPoolExecutor instead, creating and destroying many processes over and over could become computationally expensive. Creating and destroying too many threads could consume huge memory as well.

    Below is a complete working example, demonstrating how to create a reusable custom ThreadPoolExecutor. Calling the /get_active_threads endpoint from your browser, e.g., http://localhost:8000/get_active_threads, while running a performance test with ApacheBench (using 50 concurrent requests, as described in your question and as shown below), you would see that the number of active threads never goes above 51 (50 concurrent threads + 1, which is the main thread), despite setting the max_workers argument to 60 in the example below. This is simply because, in this performance test, the application is never required to serve more than 50 requests at the same time. Also, ThreadPoolExecutor won't spawn new threads, if idle threads are available (thus, saving resources)—see the relevant implementation part. Hence, again, initialising the ThreadPoolExecutor with max_workers=100, as shown in your recent update, would be unecessary, if you never expect your FastAPI application to serve more than 50 requests at a time (to endpoints where that ThreadPoolExecutor is used).

    Working Example

    from fastapi import FastAPI, Request
    from contextlib import asynccontextmanager
    import concurrent.futures
    import threading
    import asyncio
    import time
    
    
    @asynccontextmanager
    async def lifespan(app: FastAPI):    
        pool = concurrent.futures.ThreadPoolExecutor(max_workers=60)
        yield {'pool': pool}
        pool.shutdown()
    
    
    app = FastAPI(lifespan=lifespan)
    
    
    @app.get("/sync_async")
    async def test_sync_async(request: Request) -> None:
        loop = asyncio.get_running_loop()
        await loop.run_in_executor(request.state.pool, time.sleep, 3)  
        print("sync_async")
    
    
    @app.get('/get_active_threads')
    async def get_active_threads():
        return threading.active_count()
    

    Using ApacheBench, you could test the example above as follows:

    ab -n 1000 -c 50 "http://localhost:8000/sync_async"
    

    Final Notes

    In general, you should always aim to use asynchronous code (i.e., using async/await), wherever is possible, as async code, also known as coroutines, runs directly in the event loop—the event loop runs in the main thread and executes all tasks in that thread. That means there is only one thread that can take a lock on the interpreter; thus, avoiding the additional overhead of context switching (i.e., the CPU jumping from one thread of execution to another). When dealing with sync blocking IO-bound tasks though, you could either (1) define your endpoint with def and let FastAPI handle it behind the scenes as described earlier, as well as in this answer, or (2) define your endpoint with async def and use run_in_threadpool() on your own to run that blocking task in a separate thread and await it, or (3) define your endpoint with async def and use asyncio's loop.run_in_executor() with a custom (preferably reusable) ThreadPoolExecutor, adjusting the number of worker threads as required. When required to perform blocking CPU-bound tasks, while running such tasks in a separate from an external threadpool and awaiting them would successfully prevent the event loop from getting blocked, it wouldn't, however, provide the performance improvement you would expect from running code in parallel. Thus, for CPU-bound tasks, one may choose to use a ProcessPoolExecutor instead (Note: when using processes in general, you need to explicitly protect the entry point with if __name__ == '__main__')—example on using a ProcessPoolExecutor can be found in this answer.

    To run tasks in the background, without waiting for them to complete in order to proceed with executing the rest of the code in an endpoint, you could use FastAPI's BackgroundTasks, as shown here and here. If the background task function is defined with async def, FastAPI will run it directly in the event loop, whereas if it is defined with normal def, FastAPI will use run_in_threadpool() and await the returned coroutine (same concept as API endpoints). Another option when you need to run an async def function in the background, but not necessarily having it trigerred after returning a FastAPI response (which is the case in BackgroundTasks), is to use asyncio.create_task(), as shown in this answer and this answer. If you need to perform heavy background computation and you don't necessarily need it to be run by the same process, you may benefit from using other bigger tools such as Celery.

    Finally, regarding the optimal/maximum number of worker threads, I would suggest reading this article (have a look at this article as well for more details on ThreadPoolExecutor in general). As explained in the article:

    It is important to limit the number of worker threads in the thread pools to the number of asynchronous tasks you wish to complete, based on the resources in your system, or on the number of resources you intend to use within your tasks.

    Alternately, you may wish to increase the number of worker threads dramatically, given the greater capacity in the resources you intend to use.

    [...]

    It is common to have more threads than CPUs (physical or logical) in your system. The reason for this is that threads are used for IO-bound tasks, not CPU-bound tasks. This means that threads are used for tasks that wait for relatively slow resources to respond, like hard drives, DVD drives, printers, network connections, and much more.

    Therefore, it is not uncommon to have tens, hundreds and even thousands of threads in your application, depending on your specific needs. It is unusual to have more than one or a few thousand threads. If you require this many threads, then alternative solutions may be preferred, such as AsyncIO.

    Also, in the same article:

    Does the Number of Threads in the ThreadPoolExecutor Match the Number of CPUs or Cores?

    The number of worker threads in the ThreadPoolExecutor is not related to the number of CPUs or CPU cores in your system.

    You can configure the number of threads based on the number of tasks you need to execute, the amount of local system resources you have available (e.g., memory), and the limitations of resources you intend to access within your tasks (e.g., connections to remote servers).

    How Many Threads Should I Use?

    If you have hundreds of tasks, you should probably set the number of threads to be equal to the number of tasks.

    If you have thousands of tasks, you should probably cap the number of threads at hundreds or 1,000.

    If your application is intended to be executed multiple times in the future, you can test different numbers of threads and compare overall execution time, then choose a number of threads that gives approximately the best performance. You may want to mock the task in these tests with a random sleep operation.

    What Is the Maximum Number of Worker Threads in the ThreadPoolExecutor?

    There is no maximum number of worker threads in the ThreadPoolExecutor.

    Nevertheless, your system will have an upper limit of the number of threads you can create based on how much main memory (RAM) you have available.

    Before you exceed main memory, you will reach a point of diminishing returns in terms of adding new threads and executing more tasks. This is because your operating system must switch between the threads, called context switching. With too many threads active at once, your program may spend more time context switching than actually executing tasks.

    A sensible upper limit for many applications is hundreds of threads to perhaps a few thousand threads. More than a few thousand threads on a modern system may result in too much context switching, depending on your system and on the types of tasks that are being executed.