I am facing multiple issues using an async ODM inside my celery worker First i wasn't able to init my database models using celery worker signal i am using beanie for the db connection.
First Implementation
from asyncer import syncify
from asgiref.sync import async_to_sync
client = AsyncIOMotorClient(
DATABASE_URL, uuidRepresentation="standard" )
db = client[DB_NAME]
async def db_session():
await init_beanie(
database=db,
document_models=[Project, User],
)
@worker_ready.connect
def startup_celery_ecosystem(**kwargs):
logger.info('Startup celery worker process')
async_to_sync(db_session)()
logger.info('FINISHED : Startup celery worker process')
async def get_users():
users = User.find()
users_list = await users.to_list()
return users_list
@celery_app.task
def pool_db():
async_to_sync(get_users)()
#syncify(get_users)() same error User class is not initialized yet (init_beanie should have already initialized all the models )
With this implementation i could not access my database using the User and Project class and it raises an error as if User and Project haven't been instantiated yet
The workaround is to call db_session() at the module level which solve the problem with database models instantiation, But now when querying the database i get the following error from my celery task
RuntimeError: Event loop is closed
Second Implementation
from asyncer import syncify
from asgiref.sync import async_to_sync client = AsyncIOMotorClient(
DATABASE_URL, uuidRepresentation="standard" )
db = client[DB_NAME]
async def db_session():
await init_beanie(
database=db,
document_models=[Project, User],
)
# now init_beanie at module level
async_to_sync(db_session)()
async def get_users():
users = User.find()
users_list = await users.to_list()
return users_list
@celery_app.task
def pool_db():
# this raises the following Runtime error RuntimeError('Event loop is closed')
async_to_sync(get_users)()
#syncify(get_users)() same error
i am not very familiar with how asyncio is implemented and how asyncer and asgiref allows to run async code inside a sync thread which left me confused, any help would be appriciated
After many investigation using flower for monitoring workers and logging the workers Id ( processes ids) it turns out that Celery worker itself does not process any tasks, it spawns other child processes ( this is my case because i am using the default executor pool which is prefork), while the signal ( worker_ready.connect ) is only run on the supervisor process Celery worker and not the childs, and since processes are isoleted memory wise, this means that you can't have access to db connection or any initialized ressources from the child processes. Now i am using celery with gevent which only spawn 1 process, because initially my project doesn't require CPU heavy tasks which means i don't need all the cpu power provided by the prefork pool