pythoncelerypython-asynciodjango-celerygevent

Combining Celery Gevent Pool with asyncio (asgiref)


I have lots of async code which is used for websocket connectons. Idea is simple: fetch data from websocket, calculate some math and do lots of io.

 data = await websocket.recv()
 # Do some math
 await obj.save()
 await redis.publish(data)
 await api_client.send(data)
 # And so on

This works perfect in an independent container.

But now i received another task:

Make Celery worker which will "re-check" data from third party api once in 10 minutes and do all the same logic. So i have to put all this stuff into celery task. Currently i do something like this:

@app.task
def sync_with_api():
     for obj in objects_to_sync:
          asgiref.sync.async_to_sync(websocket_logic)(obj)

This works, but i do all of it in synchronous way, but we expect thousand of objects to sync. So i am wondering about performance.

I thought about using gevent pool and did this:

@app.task
def sync_objects():
     for obj in objects_to_sync:
          app.send_task("core.sync_task", args=[obj])

@app.task
def sync_task(obj):
     asgiref.sync.async_to_sync(websocket_logic)(obj)

From my perspective, this should works better. But i am not sure is it a good idea of combining gevent with asyncio because this is two different concurrency models. Does anyone had experience with problems like that?

Or i should just refactor original code and made two versions:

  1. Async to work with websockets
  2. Sync to work with gevent

Solution

  • I ended up with the solution:

    async def sync_object(obj):
        await api_client.call()
        # some another logic
    
    @app.task
    def task_to_sync():
         objects = Objects.objects.all()
         loop = asyncio.get_event_loop()
         group = asyncio.gather(*[sync_objects(obj) for obj in objects])
         loop.run_until_complete(group)
         loop.close()
    

    I have two celery containers. One for i/o background tasks and gevent as concurrency model. And second with prefork for common tasks and cpu bound. This task should be called once in 10 minutes, so this solution is good enough to handle it. So i call from prefork celery and it work good enough.