pythonpython-3.xpython-asyncio

Is there a way to use asyncio.Queue in multiple threads?


Let's assume I have the following code:

import asyncio
import threading

queue = asyncio.Queue()

def threaded():
    import time
    while True:
        time.sleep(2)
        queue.put_nowait(time.time())
        print(queue.qsize())

@asyncio.coroutine
def async():
    while True:
        time = yield from queue.get()
        print(time)

loop = asyncio.get_event_loop()
asyncio.Task(async())
threading.Thread(target=threaded).start()
loop.run_forever()

The problem with this code is that the loop inside async coroutine is never finishing the first iteration, while queue size is increasing.

Why is this happening this way and what can I do to fix it?

I can't get rid of separate thread, because in my real code I use a separate thread to communicate with a serial device, and I haven't find a way to do that using asyncio.


Solution

  • asyncio.Queue is not thread-safe, so you can't use it directly from more than one thread. Instead, you can use janus, which is a third-party library that provides a thread-aware asyncio queue.

    import asyncio
    import threading
    import janus
    
    def threaded(squeue):
        import time
        while True:
            time.sleep(2)
            squeue.put_nowait(time.time())
            print(squeue.qsize())
    
    @asyncio.coroutine
    def async_func(aqueue):
        while True:
            time = yield from aqueue.get()
            print(time)
    
    loop = asyncio.get_event_loop()
    queue = janus.Queue(loop=loop)
    asyncio.create_task(async_func(queue.async_q))
    threading.Thread(target=threaded, args=(queue.sync_q,)).start()
    loop.run_forever()
    

    There is also aioprocessing (full-disclosure: I wrote it), which provides process-safe (and as a side-effect, thread-safe) queues as well, but that's overkill if you're not trying to use multiprocessing.

    Edit

    As pointed it out in other answers, for simple use-cases you can use loop.call_soon_threadsafe to add to the queue, as well.