pythonpython-asyncio

Communication between async tasks and synchronous threads in python


I am looking for the best solution for communication between async tasks and methods/functions that run in a thread pool executor from concurrent.futures. In previous synchronous projects, I would use the queue.Queue class. I assume that any method should be thread safe and therefore the asyncio.queue will not work.

I have seen people extend the queue.Queue class to do something like:

class async_queue(Queue):
  async def aput(self, item):
    self.put_nowait(item)

  async def aget(self):
    resp = await asyncio.get_event_loop().run_in_executor( None, self.get )
    return resp

Is there a better way?


Solution

  • I would recommend going the other way around: using the asyncio.Queue class to communicate between the two worlds. This has the advantage of not having to spend a slot in the thread pool on operations that take a long time to complete, such as a get().

    Here is an example:

    class Queue:
        def __init__(self):
            self._loop = asyncio.get_running_loop()
            self._queue = asyncio.Queue()
    
        def sync_put_nowait(self, item):
            self._loop.call_soon(self._queue.put_nowait, item)
    
        def sync_put(self, item):
            asyncio.run_coroutine_threadsafe(self._queue.put(item), self._loop).result()
    
        def sync_get(self):
            return asyncio.run_coroutine_threadsafe(self._queue.get(item), self._loop).result()
    
        def async_put_nowait(self, item):
            self._queue.put_nowait(item)
    
        async def async_put(self, item):
            await self._queue.put(item)
    
        async def async_get(self):
            return await self._queue.get()
    

    The methods prefixed with sync_ are meant to be invoked by sync code (running outside the event loop thread). The ones prefixed with async_ are to be called by code running in the event loop thread, regardless of whether they are actually coroutines. (put_nowait, for example, is not a coroutine, but it still must be distinguished between a sync and an async version.)