I have a program with one main thread where I spawn a second thread that uses asyncio. Are there any tools provided to synchronize these two threads? If everything was asyncio, I could do it with its synchronization primitives, eg:
import asyncio
async def taskA(lst, evt):
print(f'Appending 1')
lst.append(1)
evt.set()
async def taskB(lst, evt):
await evt.wait()
print('Retrieved:', lst.pop())
lst = []
evt = asyncio.Event()
asyncio.get_event_loop().run_until_complete(asyncio.gather(
taskA(lst, evt),
taskB(lst, evt),
))
However, this does not work with multiple threads. If I just use a threading.Event
then it will block the asyncio thread. I figured out I could defer the wait to an executor:
import asyncio
import threading
def taskA(lst, evt):
print(f'Appending 1')
lst.append(1)
evt.set()
async def taskB(lst, evt):
await asyncio.get_event_loop().run_in_executor(None, evt.wait)
print('Retrieved:', lst.pop())
def targetA(lst, evt):
taskA(lst, evt)
def targetB(lst, evt):
asyncio.set_event_loop(asyncio.new_event_loop())
asyncio.get_event_loop().run_until_complete(taskB(lst, evt))
lst = []
evt = threading.Event()
threadA = threading.Thread(target=targetA, args=(lst, evt))
threadB = threading.Thread(target=targetB, args=(lst, evt))
threadA.start()
threadB.start()
threadA.join()
threadB.join()
However, having an executor thread only to wait for a mutex seems unnatural. Is this the way this is supposed to be done? Or is there any other way to wait for synchronization between OS threads asynchronously?
A simple way to synchronize an asyncio coroutine with an event coming from another thread is to await an asyncio.Event
in taskB, and set it from taskA using loop.call_soon_threadsafe
.
To be able to pass values and exceptions between the two, you can use futures; however then you are inventing much of run_in_executor
. If the only job of taskA is to take tasks off a queue, you might as well make a single-worker "pool" and use it as your worker thread. Then you can use run_in_executor
as intended:
worker = concurrent.futures.ThreadPoolExecutor(max_workers=1)
async def taskB(lst):
loop = asyncio.get_event_loop()
# or result = await ..., if taskA has a useful return value
# This will also propagate exceptions raised by taskA
await loop.run_in_executor(worker, taskA, lst)
print('Retrieved:', lst.pop())
The semantics are the same as in your version with an explicit queue - the queue is still there, it's just inside the ThreadPoolExecutor
.