I am looking for the best solution for communication between async tasks and methods/functions that run in a thread pool executor from concurrent.futures. In previous synchronous projects, I would use the queue.Queue
class. I assume that any method should be thread safe and therefore the asyncio.queue
will not work.
I have seen people extend the queue.Queue
class to do something like:
class async_queue(Queue):
async def aput(self, item):
self.put_nowait(item)
async def aget(self):
resp = await asyncio.get_event_loop().run_in_executor( None, self.get )
return resp
Is there a better way?
I would recommend going the other way around: using the asyncio.Queue class to communicate between the two worlds. This has the advantage of not having to spend a slot in the thread pool on operations that take a long time to complete, such as a get()
.
Here is an example:
class Queue:
def __init__(self):
self._loop = asyncio.get_running_loop()
self._queue = asyncio.Queue()
def sync_put_nowait(self, item):
self._loop.call_soon(self._queue.put_nowait, item)
def sync_put(self, item):
asyncio.run_coroutine_threadsafe(self._queue.put(item), self._loop).result()
def sync_get(self):
return asyncio.run_coroutine_threadsafe(self._queue.get(item), self._loop).result()
def async_put_nowait(self, item):
self._queue.put_nowait(item)
async def async_put(self, item):
await self._queue.put(item)
async def async_get(self):
return await self._queue.get()
The methods prefixed with sync_
are meant to be invoked by sync code (running outside the event loop thread). The ones prefixed with async_
are to be called by code running in the event loop thread, regardless of whether they are actually coroutines. (put_nowait
, for example, is not a coroutine, but it still must be distinguished between a sync and an async version.)