pythoncachingpython-asynciofastapinest-asyncio

Caching results in an async environment


I am working in a FastAPI endpoint that make a I/O bound operation, which is async for efficiency. However, it takes time, so I would like to cache the results to reuse it for a period of time.

I currently I have this:

from fastapi import FastAPI
import asyncio

app = FastAPI()

async def _get_expensive_resource(key) -> None:
    await asyncio.sleep(2)
    return True

@app.get('/')
async def get(key):
    return await _get_expensive_resource(key)

if __name__ == "__main__":
    import uvicorn
    uvicorn.run("test:app")

I am trying to use the cachetools package to cache the results and I have tried something like the following:

import asyncio
from cachetools import TTLCache
from fastapi import FastAPI
  
app = FastAPI()

async def _get_expensive_resource(key) -> None:
    await asyncio.sleep(2)
    return True

class ResourceCache(TTLCache):
    def __missing__(self, key):
        loop = asyncio.get_event_loop()
        resource = loop.run_until_complete(_get_expensive_resource(key))
        self[key] = resource
        return resource

resource_cache = ResourceCache(124, 300)

@app.get('/')
async def get(key: str):
    return resource_cache[key]

if __name__ == "__main__":
    import uvicorn
    uvicorn.run("test2:app")

However, this fails, because, as far as I understand, the __missing__ method is sync and you can't call async from sync from async. The error is:

RuntimeError: this event loop is already running.

Similar error happen if I use plain asyncio instead of uvloop.

For the asyncio event loop, I have tried using nest_asyncio package, but it does not patch uvloop and also, even when using it with asyncio, it seems like the service freezes after using it the first time.

Do you have any idea how could I acomplish this?


Solution

  • Auto-answering for other who come across this (including myself in fifteen days):

    TTLCache works like a normal python dictionary, accessing a missing key will call the __missing__ method. So, we would like to use the value in the dictonary if present, and if not, we can gather the resource in this method. This method should also set the key in the cache (so next time it will be present) and return the value for the use this time.

    class ResourceCache(TTLCache):
        def __missing__(self, key) -> asyncio.Task:
            # Create a task 
            resource_future = asyncio.create_task(_get_expensive_resource(key))
            self[key] = resource_future
            return resource_future
    

    So, we have a cache (essentially a dictionary) that maps keys to asyncio.Tasks. The tasks will be executed asynchronously in the event loop (which is already started by FastAPI!). And when we need the result we can await for them in the endpoint code or actually anywhere, as long as its and async function!

    @app.get("/")
    async def get(key:str) -> bool:
        return await resource_cache[key]
    

    Calling this endpoint for a second time (within the timeout of the cache) will use the cached resource (in our example mocked with 'true').