pythonpython-asyncio

Can I make an asynchronous function indifferentiable from a synchronous one?


If I can write a function in either synchronous or asynchronous fashion (e.g. for retrieving some data from an API), I would like to ideally only implement it once as asynchronous, and run it as synchronous whenever I implement a non-async function.

I'm therefore looking for a run function with the following signature:

def run(coro: Coroutine[Any, Any, _ReturnT]) -> _ReturnT: ...

An easy solution would be to simply use the run function defined in asyncio:

run1 = asyncio.run

The problem is that if I wrap this function into a synchronous version:

def sync_f():
    return run(async_f())

Then I can't use sync_f exactly as synchronous function. To see this, imagine another (sync) module builds on sync_f to do other synchronous stuff:

def sync_g():
    print("Doing some synchronous things")
    res = sync_f()
    print("Doing some other synchronous things")
    return res

And that finally some asynchronous function async_h wants to use the sync_g logic:

async def async_h():
    print("Doing some asynchronous things")
    res = sync_g()
    print("Doing some other asynchronous things")
    return res

Then running that last function, for instance in a __main__ block with asyncio.run(async_h()), will result in the following error: RuntimeError: asyncio.run() cannot be called from a running event loop.

I tried to be a bit smarter with my definition of run, trying to see if a higher-level loop is currently running, and running my coroutine in that:

def run2(coro: Coroutine[Any, Any, _ReturnT]) -> _ReturnT:
    try:
        loop = asyncio.get_running_loop()
    except RuntimeError:
        return asyncio.run(coro)
    else:
        return loop.run_until_complete(coro)

But to no avail: RuntimeError: This event loop is already running. Which makes sense, but I couldn't find something that would use the current run (something like loop.wait_until_complete(coro)).

Isn't there any way to wrap an asynchronous function into a normal one that will work exactly as a synchronous one, without having the implementation detail of the asynchronous version leaking into the higher contexts?


Solution

  • I finally found the answer to this question! One can create a runner which keeps a thread dedicated to running asynchronous code. This works perfectly:

    import asyncio
    import threading
    
    
    class AsyncRunner:
        def __init__(self):
            self._loop = asyncio.new_event_loop()
            self._thread = threading.Thread(target=self._loop.run_forever, daemon=True)
            self._thread.start()
    
        def run(self, coro):
            return asyncio.run_coroutine_threadsafe(coro, self._loop).result()
    
        def close(self):
            self._loop.call_soon_threadsafe(self._loop.stop)
            self._thread.join()
            self._loop.close()
    

    You can (and probably should) add some bells and whistles to the thread management (maybe using a context manager), but this is the gist of the solution.