pythonmultithreadingsynchronizationpython-asyncio

How can I synchronize asyncio with other OS threads?


I have a program with one main thread where I spawn a second thread that uses asyncio. Are there any tools provided to synchronize these two threads? If everything was asyncio, I could do it with its synchronization primitives, eg:

import asyncio

async def taskA(lst, evt):
    print(f'Appending 1')
    lst.append(1)
    evt.set()

async def taskB(lst, evt):
    await evt.wait()
    print('Retrieved:', lst.pop())

lst = []
evt = asyncio.Event()
asyncio.get_event_loop().run_until_complete(asyncio.gather(
    taskA(lst, evt),
    taskB(lst, evt),
))

However, this does not work with multiple threads. If I just use a threading.Event then it will block the asyncio thread. I figured out I could defer the wait to an executor:

import asyncio
import threading

def taskA(lst, evt):
    print(f'Appending 1')
    lst.append(1)
    evt.set()

async def taskB(lst, evt):
    await asyncio.get_event_loop().run_in_executor(None, evt.wait)
    print('Retrieved:', lst.pop())

def targetA(lst, evt):
    taskA(lst, evt)

def targetB(lst, evt):
    asyncio.set_event_loop(asyncio.new_event_loop())
    asyncio.get_event_loop().run_until_complete(taskB(lst, evt))

lst = []
evt = threading.Event()
threadA = threading.Thread(target=targetA, args=(lst, evt))
threadB = threading.Thread(target=targetB, args=(lst, evt))
threadA.start()
threadB.start()
threadA.join()
threadB.join()

However, having an executor thread only to wait for a mutex seems unnatural. Is this the way this is supposed to be done? Or is there any other way to wait for synchronization between OS threads asynchronously?


Solution

  • A simple way to synchronize an asyncio coroutine with an event coming from another thread is to await an asyncio.Event in taskB, and set it from taskA using loop.call_soon_threadsafe.

    To be able to pass values and exceptions between the two, you can use futures; however then you are inventing much of run_in_executor. If the only job of taskA is to take tasks off a queue, you might as well make a single-worker "pool" and use it as your worker thread. Then you can use run_in_executor as intended:

    worker = concurrent.futures.ThreadPoolExecutor(max_workers=1)
    
    async def taskB(lst):
        loop = asyncio.get_event_loop()
        # or result = await ..., if taskA has a useful return value
        # This will also propagate exceptions raised by taskA
        await loop.run_in_executor(worker, taskA, lst)
        print('Retrieved:', lst.pop())
    

    The semantics are the same as in your version with an explicit queue - the queue is still there, it's just inside the ThreadPoolExecutor.