pythonpython-asyncio

asyncio run coroutine from a synchronous function


How can I call task2 from func without declaring func async and awaiting it? My first thought was to create a thread and use run_coroutine_threadsafe but it deadlocks. Same as not using a thread. Do I have to start a new loop?

import asyncio
from threading import Thread

async def task2():
    print("starting task2...")
    await asyncio.sleep(1)
    print("finished task2.")
    return "done"

def func(loop=None):
    print("running func...")
    if not loop:
        loop = asyncio.get_running_loop()
    assert loop
    future = asyncio.run_coroutine_threadsafe(
        task2(), 
        loop)
    result = future.result()
    print(f"{result=}")
    print("done func...")

async def task1():
    print("starting task1...")
    await asyncio.sleep(1)
    # func()
    loop = asyncio.get_running_loop()
    t = Thread(target=func, args=(loop,))
    t.start()
    t.join()
    print("finished task1.")

if __name__ == '__main__':
    asyncio.run(task1())

Solution

  • python threading synchronization primitives such as Thread.join don't work well with asyncio because they suspend the thread and therefore block the running eventloop, so run_coroutine_threadsafe cannot use the blocked eventloop.

    Instead you have loop.run_in_executor for creating threaded tasks that don't block the eventloop.

    import asyncio
    
    async def task2():
        print("starting task2...")
        await asyncio.sleep(1)
        print("finished task2.")
        return "done"
    
    def func(loop: asyncio.AbstractEventLoop):
        print("running func...")
    
        future = asyncio.run_coroutine_threadsafe(
            task2(),
            loop)
        result = future.result()
        print(f"{result=}")
        print("done func...")
    
    async def task1():
        print("starting task1...")
        await asyncio.sleep(1)
        loop = asyncio.get_running_loop()
        task = loop.run_in_executor(None, func, loop)
        await task  # doesn't block the eventloop
        print("finished task1.")
    
    if __name__ == '__main__':
        asyncio.run(task1())
    
    starting task1...
    running func...
    starting task2...
    finished task2.
    result='done'
    done func...
    finished task1.
    

    Each evenloop has a default ThreadPoolExecutor that it uses if you pass None as the executor, but it has a limited number of workers and is intended for computational and non-blocking work, while you are using it for blocking work, so it may be beneficial to override it with a larger number of workers using loop.set_default_executor, or you can have each part of your codebase use a different ThreadPoolExecutor with a different size to avoid one part hogging all the workers with blocking tasks. threads are created lazily anyway.


    Another solution is to use asyncio.run which creates a new eventloop on the current thread to run this coroutine. just make sure you only call it on a thread that doesn't already have an eventloop running or it will throw an exception.

    I think sending back the coroutine to the original loop as you are doing is a good solution, and is actually the most performant, otherwise you can create a shared daemon thread that has an eventloop just for the sake of sending coroutines to it.