If I can write a function in either synchronous or asynchronous fashion (e.g. for retrieving some data from an API), I would like to ideally only implement it once as asynchronous, and run it as synchronous whenever I implement a non-async
function.
I'm therefore looking for a run
function with the following signature:
def run(coro: Coroutine[Any, Any, _ReturnT]) -> _ReturnT: ...
An easy solution would be to simply use the run function defined in asyncio
:
run1 = asyncio.run
The problem is that if I wrap this function into a synchronous version:
def sync_f():
return run(async_f())
Then I can't use sync_f
exactly as synchronous function.
To see this, imagine another (sync) module builds on sync_f
to do other synchronous stuff:
def sync_g():
print("Doing some synchronous things")
res = sync_f()
print("Doing some other synchronous things")
return res
And that finally some asynchronous function async_h
wants to use the sync_g
logic:
async def async_h():
print("Doing some asynchronous things")
res = sync_g()
print("Doing some other asynchronous things")
return res
Then running that last function, for instance in a __main__
block with asyncio.run(async_h())
, will result in the following error: RuntimeError: asyncio.run() cannot be called from a running event loop
.
I tried to be a bit smarter with my definition of run
, trying to see if a higher-level loop is currently running, and running my coroutine in that:
def run2(coro: Coroutine[Any, Any, _ReturnT]) -> _ReturnT:
try:
loop = asyncio.get_running_loop()
except RuntimeError:
return asyncio.run(coro)
else:
return loop.run_until_complete(coro)
But to no avail: RuntimeError: This event loop is already running
.
Which makes sense, but I couldn't find something that would use the current run (something like loop.wait_until_complete(coro)
).
Isn't there any way to wrap an asynchronous function into a normal one that will work exactly as a synchronous one, without having the implementation detail of the asynchronous version leaking into the higher contexts?
I finally found the answer to this question! One can create a runner which keeps a thread dedicated to running asynchronous code. This works perfectly:
import asyncio
import threading
class AsyncRunner:
def __init__(self):
self._loop = asyncio.new_event_loop()
self._thread = threading.Thread(target=self._loop.run_forever, daemon=True)
self._thread.start()
def run(self, coro):
return asyncio.run_coroutine_threadsafe(coro, self._loop).result()
def close(self):
self._loop.call_soon_threadsafe(self._loop.stop)
self._thread.join()
self._loop.close()
You can (and probably should) add some bells and whistles to the thread management (maybe using a context manager), but this is the gist of the solution.