Let's assume I have the following code:
import asyncio
import threading
queue = asyncio.Queue()
def threaded():
import time
while True:
time.sleep(2)
queue.put_nowait(time.time())
print(queue.qsize())
@asyncio.coroutine
def async():
while True:
time = yield from queue.get()
print(time)
loop = asyncio.get_event_loop()
asyncio.Task(async())
threading.Thread(target=threaded).start()
loop.run_forever()
The problem with this code is that the loop inside async
coroutine is never finishing the first iteration, while queue
size is increasing.
Why is this happening this way and what can I do to fix it?
I can't get rid of separate thread, because in my real code I use a separate thread to communicate with a serial device, and I haven't find a way to do that using asyncio
.
asyncio.Queue
is not thread-safe, so you can't use it directly from more than one thread. Instead, you can use janus
, which is a third-party library that provides a thread-aware asyncio
queue.
import asyncio
import threading
import janus
def threaded(squeue):
import time
while True:
time.sleep(2)
squeue.put_nowait(time.time())
print(squeue.qsize())
@asyncio.coroutine
def async_func(aqueue):
while True:
time = yield from aqueue.get()
print(time)
loop = asyncio.get_event_loop()
queue = janus.Queue(loop=loop)
asyncio.create_task(async_func(queue.async_q))
threading.Thread(target=threaded, args=(queue.sync_q,)).start()
loop.run_forever()
There is also aioprocessing
(full-disclosure: I wrote it), which provides process-safe (and as a side-effect, thread-safe) queues as well, but that's overkill if you're not trying to use multiprocessing
.
Edit
As pointed it out in other answers, for simple use-cases you can use loop.call_soon_threadsafe
to add to the queue, as well.