As my project heavily relies on asynchronous network I/O, I always have to expect some weird network error to occur: whether it is the service I'm connecting to having an API outage, or my own server having a network issue, or something else. Issues like that appear, and there's no real way around it. So, I eventually ended up trying to figure out a way to effectively "pause" a coroutine's execution from outside whenever such a network issue occured, until the connection has been reestablished. My approach is writing a decorator pausable
that takes an argument pause
which is a coroutine function that will be yield
ed from
/ await
ed like this:
def pausable(pause, resume_check=None, delay_start=None):
if not asyncio.iscoroutinefunction(pause):
raise TypeError("pause must be a coroutine function")
if not (delay_start is None or asyncio.iscoroutinefunction(delay_start)):
raise TypeError("delay_start must be a coroutine function")
def wrapper(coro):
@asyncio.coroutine
def wrapped(*args, **kwargs):
if delay_start is not None:
yield from delay_start()
for x in coro(*args, **kwargs):
try:
yield from pause()
yield x
# catch exceptions the regular discord.py user might not catch
except (asyncio.CancelledError,
aiohttp.ClientError,
websockets.WebSocketProtocolError,
ConnectionClosed,
# bunch of other network errors
) as ex:
if any((resume_check() if resume_check is not None else False and
isinstance(ex, asyncio.CancelledError),
# clean disconnect
isinstance(ex, ConnectionClosed) and ex.code == 1000,
# connection issue
not isinstance(ex, ConnectionClosed))):
yield from pause()
yield x
else:
raise
return wrapped
return wrapper
Pay special attention to this bit:
for x in coro(*args, **kwargs):
yield from pause()
yield x
Example usage (ready
is an asyncio.Event
):
@pausable(ready.wait, resume_check=restarting_enabled, delay_start=ready.wait)
@asyncio.coroutine
def send_test_every_minute():
while True:
yield from client.send("Test")
yield from asyncio.sleep(60)
However, this does not seem to work and it does not seem like an elegant solution to me. Is there a working solution that is compatible with Python 3.5.3 and above? Compatibility with Python 3.4.4 and above is desirable.
Just try
/except
ing the exceptions raised in the coroutine that needs to be paused is neither always possible nor a viable option to me as it heavily violates against a core code design principle (DRY) I'd like to comply with; in other words, excepting so many exceptions in so many coroutine functions would make my code messy.
Few words about current solution.
for x in coro(*args, **kwargs):
try:
yield from pause()
yield x
except
...
You won't be able to catch exceptions this way:
.
@asyncio.coroutine
def test():
yield from asyncio.sleep(1)
raise RuntimeError()
yield from asyncio.sleep(1)
print('ok')
@asyncio.coroutine
def main():
coro = test()
try:
for x in coro:
try:
yield x
except Exception:
print('Exception is NOT here.')
except Exception:
print('Exception is here.')
try:
next(coro)
except StopIteration:
print('And after first exception generator is exhausted.')
if __name__ == '__main__':
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
finally:
loop.close()
Output:
Exception is here.
And after first exception generator is exhausted.
Even if it was possible to resume, consider what will happen if coroutine already did some cleanup operations due to exception.
Given all above, if some coroutine raised exception only option you have is to suppress this exception (if you want) and re-run this coroutine. You can rerun it after some event if you want. Something like this:
def restart(ready_to_restart):
def wrapper(func):
@asyncio.coroutine
def wrapped(*args, **kwargs):
while True:
try:
return (yield from func(*args, **kwargs))
except (ConnectionClosed,
aiohttp.ClientError,
websockets.WebSocketProtocolError,
ConnectionClosed,
# bunch of other network errors
) as ex:
yield from ready_to_restart.wait()
ready_to_restart = asyncio.Event() # set it when you sure network is fine
# and you're ready to restart
Upd
However, how would I make the coroutine continue where it was interrupted now?
Just to make things clear:
@asyncio.coroutine
def test():
with aiohttp.ClientSession() as client:
yield from client.request_1()
# STEP 1:
# Let's say line above raises error
# STEP 2:
# Imagine you you somehow maged to return to this place
# after exception above to resume execution.
# But what is state of 'client' now?
# It's was freed by context manager when we left coroutine.
yield from client.request_2()
Nor functions, nor coroutines are designed to resume their execution after exception was propagated outside from them.
Only thing that comes to mind is to split complex operation to re-startable little ones while whole complex operation can store its state:
@asyncio.coroutine
def complex_operation():
with aiohttp.ClientSession() as client:
res = yield from step_1(client)
# res/client - is a state of complex_operation.
# It can be used by re-startable steps.
res = yield from step_2(client, res)
@restart(ready_to_restart)
@asyncio.coroutine
def step_1():
# ...
@restart(ready_to_restart)
@asyncio.coroutine
def step_2():
# ...