I have a fastapi service that is using a library. The library is doing I/O.
I converted the library to be full async. Changed the unit tests, and all the tests pass.
Now when I try to use that library in my fastapi service using async await
I am getting the error:
File \"/Users/ferozed/stash/services/validation-api/validation_api/main.py\", line 5, in <module>
from .routers import root
File \"/Users/ferozed/stash/services/validation-api/validation_api/routers/root.py\", line 4, in <module>
from ..api import *
File \"/Users/ferozed/stash/services/validation-api/validation_api/api.py\", line 5, in <module>
from .singletons import SINGLETONS
File \"/Users/ferozed/stash/services/validation-api/validation_api/singletons.py\", line 31, in <module>
SINGLETONS = Singleton()
File \"/Users/ferozed/stash/services/validation-api/validation_api/singletons.py\", line 8, in __init__
self._event_registry_cache = event_registry_cache()
File \"/Users/ferozed/stash/services/validation-api/.venv/lib/python3.9/site-packages/event_validator/event_registry/event_registry_cache.py\", line 20, in __init__
self._client = event_registry_client()
File \"/Users/ferozed/stash/services/validation-api/.venv/lib/python3.9/site-packages/event_validator/event_registry/event_registry_client.py\", line 27, in __init__
self._session = aiohttp.ClientSession()
File \"/Users/ferozed/stash/services/validation-api/.venv/lib/python3.9/site-packages/aiohttp/client.py\", line 294, in __init__
loop = loop or asyncio.get_running_loop()
I am initializing the objects in a singleton as follows:
class Singleton(object):
def __init__(self):
self._event_registry_cache = event_registry_cache()
The event_registry_cache() initializes an event_registry_client
in it's constructor. The constructor then tries to create an aiohttp.ClientSession()
. This is the part that is throwing the exception
Here is a simplified repro:
import asyncio
import aiohttp
class client(object):
def __init__(self):
self._session = aiohttp.ClientSession()
async def do(self):
with self._session.get('http://www.google.com') as resp:
await resp.text
class singleton(object):
def __init__(self):
self._client = client()
async def do(self):
await self._client.do()
SINGLETON = singleton()
if __name__ == '__main__':
loop = asyncio.get_running_loop() or asyncio.new_event_loop()
loop.run_until_complete(SINGLETON.do())
This returns the same error:
Traceback (most recent call last):
File "/Users/ferozed/stash/services/clickstream-event-validation-api/clickstream_event_validation_api/repro.py", line 21, in <module>
SINGLETON = singleton()
File "/Users/ferozed/stash/services/clickstream-event-validation-api/clickstream_event_validation_api/repro.py", line 16, in __init__
self._client = client()
File "/Users/ferozed/stash/services/clickstream-event-validation-api/clickstream_event_validation_api/repro.py", line 7, in __init__
self._session = aiohttp.ClientSession()
File "/Users/ferozed/stash/services/clickstream-event-validation-api/.venv/lib/python3.9/site-packages/aiohttp/client.py", line 294, in __init__
loop = loop or asyncio.get_running_loop()
RuntimeError: no running event loop
Any ideas what I am doing wrong?
One guess is that I can make the session initialization lazy so that it is always initialized inside of a loop.
But is there any way to get this working without lazy init?
You're initializing an async object aiohttp.ClientSession()
from the init method which is not async. Therefore, aiohttp.ClientSession()
needs to be created within an active event loop.
You can benefit from the lazy initialization inside the startup
event.
Here is my example code:
import aiohttp
from fastapi import FastAPI
app = FastAPI()
class client(object):
def __init__(self):
self._session = None
async def initialize(self):
self._session = aiohttp.ClientSession()
async def do(self):
async with self._session.get('http://www.google.com') as resp:
return await resp.text()
class singleton(object):
def __init__(self):
self._client = client()
async def initialize(self):
await self._client.initialize()
async def do(self):
return await self._client.do()
SINGLETON = singleton()
@app.on_event("startup")
async def startup_event():
await SINGLETON.initialize()
@app.get("/")
async def read_root():
return await SINGLETON.do()