taskpython-asyncioevent-loopsuspend

Is it possible to suspend and restart tasks in async Python?


The question should be simple enough, but I couldn't find anything about it.

I have an async Python program that contains a rather long-running task that I want to be able to suspend and restart at arbitrary points (arbitrary of course meaning everywhere where there's an await keyword).

I was hoping there was something along the lines of task.suspend() and task.resume() but it seems there isn't.

Is there an API for this on task- or event-loop-level or would I need to do this myself somehow? I don't want to place an event.wait() before every await...


Solution

  • What you're asking for is possible, but not trivial. First, note that you can never have suspends on every await, but only on those that result in suspension of the coroutine, such as asyncio.sleep(), or a stream.read() that doesn't have data ready to return. Awaiting a coroutine immediately starts executing it, and if the coroutine can return immediately, it does so without dropping to the event loop. await only suspends to the event loop if the awaitee (or its awaitee, etc.) requests it. More details in these questions: [1], [2], [3], [4].

    With that in mind, you can use the technique from this answer to intercept each resumption of the coroutine with additional code that checks whether the task is paused and, if so, waits for the resume event before proceeding.

    import asyncio
    
    class Suspendable:
        def __init__(self, target):
            self._target = target
            self._can_run = asyncio.Event()
            self._can_run.set()
            self._task = asyncio.ensure_future(self)
    
        def __await__(self):
            target_iter = self._target.__await__()
            iter_send, iter_throw = target_iter.send, target_iter.throw
            send, message = iter_send, None
            # This "while" emulates yield from.
            while True:
                # wait for can_run before resuming execution of self._target
                try:
                    while not self._can_run.is_set():
                        yield from self._can_run.wait().__await__()
                except BaseException as err:
                    send, message = iter_throw, err
    
                # continue with our regular program
                try:
                    signal = send(message)
                except StopIteration as err:
                    return err.value
                else:
                    send = iter_send
                try:
                    message = yield signal
                except BaseException as err:
                    send, message = iter_throw, err
    
        def suspend(self):
            self._can_run.clear()
    
        def is_suspended(self):
            return not self._can_run.is_set()
    
        def resume(self):
            self._can_run.set()
    
        def get_task(self):
            return self._task
    

    Test:

    import time
    
    async def heartbeat():
        while True:
            print(time.time())
            await asyncio.sleep(.2)
    
    async def main():
        task = Suspendable(heartbeat())
        for i in range(5):
            print('suspending')
            task.suspend()
            await asyncio.sleep(1)
            print('resuming')
            task.resume()
            await asyncio.sleep(1)
    
    asyncio.run(main())