I am working in a FastAPI endpoint that make a I/O bound operation, which is async for efficiency. However, it takes time, so I would like to cache the results to reuse it for a period of time.
I currently I have this:
from fastapi import FastAPI
import asyncio
app = FastAPI()
async def _get_expensive_resource(key) -> None:
await asyncio.sleep(2)
return True
@app.get('/')
async def get(key):
return await _get_expensive_resource(key)
if __name__ == "__main__":
import uvicorn
uvicorn.run("test:app")
I am trying to use the cachetools
package to cache the results and I have tried something like the following:
import asyncio
from cachetools import TTLCache
from fastapi import FastAPI
app = FastAPI()
async def _get_expensive_resource(key) -> None:
await asyncio.sleep(2)
return True
class ResourceCache(TTLCache):
def __missing__(self, key):
loop = asyncio.get_event_loop()
resource = loop.run_until_complete(_get_expensive_resource(key))
self[key] = resource
return resource
resource_cache = ResourceCache(124, 300)
@app.get('/')
async def get(key: str):
return resource_cache[key]
if __name__ == "__main__":
import uvicorn
uvicorn.run("test2:app")
However, this fails, because, as far as I understand, the __missing__
method is sync and you can't call async from sync from async. The error is:
RuntimeError: this event loop is already running.
Similar error happen if I use plain asyncio instead of uvloop.
For the asyncio event loop, I have tried using nest_asyncio
package, but it does not patch uvloop
and also, even when using it with asyncio, it seems like the service freezes after using it the first time.
Do you have any idea how could I acomplish this?
Auto-answering for other who come across this (including myself in fifteen days):
TTLCache
works like a normal python dictionary, accessing a missing key will call the __missing__
method. So, we would like to use the value in the dictonary if present, and if not, we can gather the resource in this method. This method should also set the key in the cache (so next time it will be present) and return the value for the use this time.
class ResourceCache(TTLCache):
def __missing__(self, key) -> asyncio.Task:
# Create a task
resource_future = asyncio.create_task(_get_expensive_resource(key))
self[key] = resource_future
return resource_future
So, we have a cache (essentially a dictionary) that maps keys to asyncio.Tasks. The tasks will be executed asynchronously in the event loop (which is already started by FastAPI!). And when we need the result we can await
for them in the endpoint code or actually anywhere, as long as its and async function!
@app.get("/")
async def get(key:str) -> bool:
return await resource_cache[key]
Calling this endpoint for a second time (within the timeout of the cache) will use the cached resource (in our example mocked with 'true').