I have the following code:
from fastapi import FastAPI, Request
import time
app = FastAPI()
@app.get("/ping")
async def ping(request: Request):
print("Hello")
time.sleep(5)
print("bye")
return {"ping": "pong!"}
If I run my code on localhost—e.g., http://localhost:8501/ping
—in different tabs of the same browser window, I get:
Hello
bye
Hello
bye
instead of:
Hello
Hello
bye
bye
I have read about using httpx
, but, still, I cannot have a true parallelization. What's the problem?
As per FastAPI's documentation:
When you declare a path operation function (also known as endpoint) with normal
def
instead ofasync def
, it is run in an external threadpool that is thenawait
ed, instead of being called directly (as it would block the server).
also, as described here:
If you are using a third party library that communicates with something (a database, an API, the file system, etc.) and doesn't have support for using
await
, (this is currently the case for most database libraries), then declare your path operation functions as normally, with justdef
.If your application (somehow) doesn't have to communicate with anything else and wait for it to respond, use
async def
.If you just don't know, use normal
def
.Note: You can mix
def
andasync def
in your path operation functions as much as you need and define each one using the best option for you. FastAPI will do the right thing with them.Anyway, in any of the cases above, FastAPI will still work asynchronously and be extremely fast.
But by following the steps above, it will be able to do some performance optimizations.
Thus, in order to avoid blocking the server, a def
endpoint—in the context of asynchronous programming, a function defined with just def
is called synchronous function—in FastAPI, will still run in the event loop
, but instead of calling it directly, FastAPI will run a def
endpoint in a separate thread from an external threadpool that is then await
ed (more details on the external threadpool are given later on), and hence, FastAPI will still work asynchronously. In other words, the server will process requests to such endpoints concurrently (at the cost, though, of spawning a new thread or reusing an existing thread from the threadpool, for every incoming request to such endpoints). Whereas, async def
endpoints run directly in the event loop
—which runs in a single thread, typically the main thread of a process/worker, and, in this case, is created when calling uvicorn.run()
, or the equivalent method of some other ASGI server—that is, the server will also process requests to such endpoints concurrently/asynchronously, as long as there is an await
call to non-blocking operations inside such async def
endpoints/routes; usually, these are I/O-bound operations, such as waiting for (1) data from the client to be sent through the network, (2) contents of a file in the disk to be read, and (3) a database operation to finish (see here).
However, if an endpoint defined with async def
does not await
for some coroutine inside (i.e., a coroutine object is the result of calling an async def
function), in order to give up time for other tasks in the event loop
to run (e.g., requests to the same or other endpoints, background tasks, etc.), each request to such an endpoint will have to be completely finished (i.e., exit the endpoint), before returning control back to the event loop
and allowing other tasks in the event loop
to run (see this answer, if you would like to monitor all pending tasks in an event loop
). In other words, in such cases, the server would be "blocked", and hence, any requests would be processed sequentially.
Having said that, you should still consider defining an endpoint with async def
, if it does not execute any blocking operations inside that has to wait for them to respond (e.g., time.sleep()
), but is instead used to return simple JSON data, a simple HTMLResponse
(see FastAPI docs as well) or even a FileResponse
(in which case the file contents will be read asynchronously and in chunks regardless, using await anyio.open_file()
, as can be seen in the relevant FileResponse
implementation), even if there is not an await
call inside the endpoint in such cases, as FastAPI would likely perform better, when running such a simple endpoint directly in the event loop
, rather than running the endpoint in a separate thread from the external threadpool (which would be the case, if the endpoint was instead defined with normal def
). If, however, you had to return some complex and large JSON data, either encoding them on your own within the endpoint, as shown in the linked answer earlier, or using Starlette's JSONResponse
or FastAPI's ORJSONResponse
/UJSONResponse
(see this related answer as well), which, all these classes, would encode the data in a synchronous way, using json.dumps()
and orjson.dumps()
/ujson.dumps()
respectively, in that case, you might consider having the endpoint defined with normal def
(related answers could be found here and here). Alternatively, you could keep using an async def
endpoint, but have such blocking operations inside (e.g., orjson.dumps()
or df.to_json()
) run in a separate thread/process, as described in the solutions provided later on (It would be a good practice to perform benchmark tests, similar to this, and compare the results to find the best-performing approach for your case).
Note that the same concept not only applies to endpoints, but also to functions that are used as StreamingResponse
generators (see StreamingResponse
class implementation) or Background Tasks
(see BackgroundTask
class implementation and this answer), as well as Dependencies
. That means FastAPI, behind the scenes, will also run such functions defined with normal def
in a separate thread from the same external threadpool; whereas, if such functions were defined with async def
, they would run directly in the event loop
. In order to run an endpoint or a function described above in a separate thread and await
it, FastAPI uses Starlette's asynchronous run_in_threadpool()
function, which, under the hood, calls anyio.to_thread.run_sync()
. The default number of worker threads of that external threadpool is 40
and can be adjusted as required—please have a look at this answer for more details on the external threadpool and how to adjust the number of threads. Hence, after reading this answer to the end, you should be able to know when to define a FastAPI endpoint/StreamingResponse
generator/BackgroundTask
/Dependency with def
or async def
, as well as whether or not you should increase the number of threads of the external threadpool.
async def
function and await
The keyword await
(which only works within an async def
function) passes function control back to the event loop
. In other words, it suspends the execution of the surrounding coroutine, and tells the event loop
to let some other task run, until that await
ed task is completed. Note that just because you may define a custom function with async def
and then await
it inside your async def
endpoint, it doesn't mean that your code will work asynchronously, if that custom function contains, for example, calls to time.sleep()
, CPU-bound tasks, non-async I/O libraries, or any other blocking call that is incompatible with asynchronous Python code. In FastAPI, for example, when using the async
methods of UploadFile
, such as await file.read()
and await file.close()
, FastAPI/Starlette, behind the scenes, actually calls the corresponding synchronous File methods in a separate thread from the external threadpool described earlier (using run_in_threadpool()
) and await
s it; otherwise, such methods/operations would block the event loop
—you could find out more by looking at the implementation of the UploadFile
class.
Note that async
does not mean parallel, but concurrently. As mentioned earlier, asynchronous code with async
and await
is many times summarized as using coroutines. Coroutines are collaborative (or cooperatively multitasked), meaning that "at any given time, a program with coroutines is running only one of its coroutines, and this running coroutine suspends its execution only when it explicitly requests to be suspended" (see here and here for more info on coroutines).
As described in this article:
Specifically, whenever execution of a currently-running coroutine reaches an
await
expression, the coroutine may be suspended, and another previously-suspended coroutine may resume execution if what it was suspended on has since returned a value. Suspension can also happen when anasync for
block requests the next value from an asynchronous iterator or when anasync with
block is entered or exited, as these operations useawait
under the hood.
If, however, a blocking I/O-bound or CPU-bound operation was directly executed/called inside an async def
function/endpoint, it would then block the event loop
, and hence, the main thread would be blocked as well. Hence, a blocking operation such as time.sleep()
in an async def
endpoint would block the entire server (as in the code example provided in your question). Thus, if your endpoint is not going to make any async
calls, you could declare it with normal def
instead, in which case, FastAPI would run it in a separate thread from the external threadpool and await
it, as explained earlier (more solutions are given in the following sections). Example:
@app.get("/ping")
def ping(request: Request):
#print(request.client)
print("Hello")
time.sleep(5)
print("bye")
return "pong"
Otherwise, if the functions that you had to execute inside the endpoint are async
functions that you had to await
, you should define your endpoint with async def
. To demonstrate this, the example below uses the asyncio.sleep()
function (from the asyncio
library), which provides a non-blocking sleep operation. The await asyncio.sleep()
method will suspend the execution of the surrounding coroutine (until the sleep operation is completed), thus allowing other tasks in the event loop
to run. Similar example is given here.
import asyncio
@app.get("/ping")
async def ping(request: Request):
#print(request.client)
print("Hello")
await asyncio.sleep(5)
print("bye")
return "pong"
Both the endpoints above will print out the specified messages to the screen in the same order as mentioned in your question—if two requests arrived at (around) the same time—that is:
Hello
Hello
bye
bye
When using a web browser to call the same endpoint for the second (third, and so on) time, please remember to do that from a tab that is isolated from the browser's main session; otherwise, succeeding requests (i.e., coming after the first one) might be blocked by the browser (on client side), as the browser might be waiting for a response to the previous request from the server, before sending the next request. This is a common behaviour for the Chrome web browser at least, due to waiting to see the result of a request and check if the result can be cached, before requesting the same resource again.
You could confirm that by using print(request.client)
inside the endpoint, where you would see that the hostname
and port
number are the same for all incoming requests—in case the requests were initiated from tabs opened in the same browser window/session; otherwise, the port
number would normally be different for every request—and hence, those requests would be processed sequentially by the server, because of the browser sending them sequentially in the first place. To overcome this, you could either:
Reload the same tab (as is running), or
Open a new tab in an (isolated) Incognito Window, or
Use a different web browser/client to send the request, or
Use the httpx
library to make asynchronous HTTP requests, along with the awaitable asyncio.gather()
, which allows executing multiple asynchronous operations concurrently and then returns a list of results in the same order the awaitables (tasks) were passed to that function (have a look at this answer for more details).
Example:
import httpx
import asyncio
URLS = ['http://127.0.0.1:8000/ping'] * 2
async def send(url, client):
return await client.get(url, timeout=10)
async def main():
async with httpx.AsyncClient() as client:
tasks = [send(url, client) for url in URLS]
responses = await asyncio.gather(*tasks)
print(*[r.json() for r in responses], sep='\n')
asyncio.run(main())
In case you had to call different endpoints that may take different time to process a request, and you would like to print the response out on client side as soon as it is returned from the server—instead of waiting for asyncio.gather()
to gather the results of all tasks and print them out in the same order the tasks were passed to the send()
function—you could replace the send()
function of the example above with the one shown below:
async def send(url, client):
res = await client.get(url, timeout=10)
print(res.json())
return res
Async
/await
and Blocking I/O-bound or CPU-bound OperationsIf you are required to define a FastAPI endpoint (or a StreamingResponse
generator, a background task, etc.) with async def
(as you might need to await
for some coroutines inside it), but also have some synchronous blocking I/O-bound or CPU-bound operation (computationally intensive task) that would block the event loop
(essentially, the entire server) and wouldn't let other requests to go through, for example:
@app.post("/ping")
async def ping(file: UploadFile = File(...)):
print("Hello")
try:
contents = await file.read()
res = cpu_bound_task(contents) # this would block the event loop
finally:
await file.close()
print("bye")
return "pong"
then:
You should check whether you could change your endpoint's definition to normal def
instead of async def
. One way, if the only method in your endpoint that had to be await
ed was the one reading the file contents would be to declare the file contents parameter as bytes
, i.e., contents: bytes = File()
. Using that definition, FastAPI would read the file for you and you would receive the contents as bytes
. Hence, there would be no need to use an async def
endpoint with await file.read()
inside. Please note that this approach (i.e., using contents: bytes = File()
) should work fine for small files; however, for larger files, and always depending on your server's resources, this might cause issues, as the enitre file contents would be stored to memory (see the documentation on File
Parameters). Hence, if your system does not have enough RAM available to accommodate the accumulated data, your application may end up crashing—if, for instance, you have 8GB of RAM (the available RAM will always be less than the amount installed on your device, as other apps/services will be using it as well), you can't load a 50GB file.
Alternatively, you could use file: UploadFile = File(...)
definition in your endpoint, but this time call the synchronous .read()
method of the SpooledTemporaryFile
directly, which can be accessed through the .file
attribute of the UploadFile
object. In this way, you will be able to declare your endpoint with a normal def
instead, and hence, each request will run in a separate thread from the external threadpool and then be await
ed (as explained earlier). Example is given below. For more details on how to upload a File
, as well as how FastAPI/Starlette uses the SpooledTemporaryFile
behind the scenes when uploading a File
, please have a look at this answer and this answer.
@app.post("/ping")
def ping(file: UploadFile = File(...)):
print("Hello")
try:
contents = file.file.read()
res = cpu_bound_task(contents)
finally:
file.file.close()
print("bye")
return "pong"
Another way, when you would like having the endpoint defined with normal def
, as you would need to run blocking operations inside and would like having the endpoint run in a separate thread instead of calling it directly in the event loop, but at the same time you would have to await
for coroutines, is to await
such coroutines inside an async
dependency instead, as explained and demonstrated in Update 2 of this answer, and have the result returned to the def
endpoint.
Use FastAPI's (Starlette's) run_in_threadpool()
function from the concurrency
module—as @tiangolo suggested here—which "will run the function in a separate thread to ensure that the main thread (where coroutines are run) does not get blocked" (see here). run_in_threadpool
is an await
able function, where its first parameter is a normal function, and the following parameters are passed to that function directly. It supports both sequence and keyword arguments.
from fastapi.concurrency import run_in_threadpool
res = await run_in_threadpool(cpu_bound_task, contents)
Alternatively, use asyncio
's loop.run_in_executor()
—after obtaining the running event loop
using asyncio.get_running_loop()
—to run the task, which, in this case, you can await
for it to complete and return the result(s), before moving on to the next line of code. Passing None
to the executor argument, the default executor will be used; which is a ThreadPoolExecutor
:
import asyncio
loop = asyncio.get_running_loop()
res = await loop.run_in_executor(None, cpu_bound_task, contents)
or, if you would like to pass keyword arguments instead, you could use a lambda
expression (e.g., lambda: cpu_bound_task(some_arg=contents)
), or, preferably, functools.partial()
, which is specifically recommended in the documentation for loop.run_in_executor()
:
import asyncio
from functools import partial
loop = asyncio.get_running_loop()
res = await loop.run_in_executor(None, partial(cpu_bound_task, some_arg=contents))
In Python 3.9+, you could also use asyncio.to_thread()
to asynchronously run a synchronous function in a separate thread—which, essentially, uses await loop.run_in_executor(None, func_call)
under the hood, as can been seen in the implementation of asyncio.to_thread()
. The to_thread()
function takes the name of a blocking function to execute, as well as any arguments (*args
and/or **kwargs
) to the function, and then returns a coroutine that can be await
ed. Example:
import asyncio
res = await asyncio.to_thread(cpu_bound_task, contents)
Note that as explained in this answer, passing None
to the executor
argument does not create a new ThreadPoolExecutor
every time you call await loop.run_in_executor(None, ...)
, but instead re-uses the default executor with the default number of worker threads (i.e., min(32, os.cpu_count() + 4)
). Thus, depending on the requirements of your application, that number might not be enough. In that case, you should rather use a custom ThreadPoolExecutor
. For instance:
import asyncio
import concurrent.futures
loop = asyncio.get_running_loop()
with concurrent.futures.ThreadPoolExecutor() as pool:
res = await loop.run_in_executor(pool, cpu_bound_task, contents)
I would strongly recommend having a look at the linked answer above to learn about the difference between using run_in_threadpool()
and run_in_executor()
, as well as how to create a re-usable custom ThreadPoolExecutor
at the application startup, and adjust the number of maximum worker threads as needed.
ThreadPoolExecutor
will successfully prevent the event loop
from being blocked (and should be prefered for calling blocking I/O-bound tasks), but won't give you the performance improvement you would expect from running code in parallel; especially, when one needs to perform CPU-bound
tasks, such as audio or image processing and machine learning (see here). It is thus preferable to run CPU-bound tasks in a separate process—using ProcessPoolExecutor
, as shown below—which, again, you can integrate with asyncio
, in order to await
it to finish its work and return the result(s). As described here, it is important to protect the entry point of the program to avoid recursive spawning of subprocesses, etc. Basically, your code must be under if __name__ == '__main__'
.
import concurrent.futures
loop = asyncio.get_running_loop()
with concurrent.futures.ProcessPoolExecutor() as pool:
res = await loop.run_in_executor(pool, cpu_bound_task, contents)
Again, I'd suggest having a look at the linked answer earlier on how to create a re-usable ProcessPoolExecutor
at application startup—you should find this answer helpful as well.
More solutions, as shown in this answer, include using asyncio.create_task()
(if your task is actually async
, but you wouldn't like to await
for it to complete), as well as spawning a new thread or process in the background, using threading
or multiprocessing
modules, respectively, instead of using concurrent.futures
. Moreover, if you had to perform some heavy background computation task that wouldn't necessarily have to be run by the same process (for example, you don't need to share memory, variables, etc.), you could also benefit from using other bigger tools like Celery, as described in FastAPI's documentation. Using apscheduler
, as demonstrated in this answer, might be another option as well—always choose what suits you best.
Use more server workers to take advantage of multi-core CPUs, in order to run multiple processes in parallel and be able to serve more requests. For example, uvicorn main:app --workers 4
. When using 1 worker, only one process is run. When using multiple workers, this will spawn multiple processes (all single threaded). Each process has a separate Global Interpreter Lock (GIL), as well as its own event loop
, which runs in the main thread of each process and executes all tasks in its thread. That means, there is only one thread that can take a lock on the interpreter of each process; unless, of course, you employ additional threads, either outside or inside the event loop
, e.g., when using run_in_threadpool
, a custom ThreadPoolExecutor
or defining endpoints/StreamingResponse
generators/background tasks/dependencies with normal def
instead of async def
, as well as when calling UploadFile
's methods (see the first two paragraphs of this answer for more details).
Note that each worker "has its own things, variables and memory". This means that global
variables/objects, etc., won't be shared across the processes/workers. In this case, you should consider using a database storage, or Key-Value stores (Caches), as described here and here. Additionally, note that "if you are consuming a large amount of memory in your code, each process will consume an equivalent amount of memory".