pythonpython-asyncio

How to properly close gather tasks if some of them take too long


I have code that looks like

async def watch_task1():
    while not stop:
        await client.ws.get_data()

async def watch_task2():
    while not stop:
        await client.ws.get_news()

async def stop_after():
    global stop
    await client.sleep(60)
    stop = True

async def main():
    tasks = [
        watch_task1(),
        watch_task2(),
        stop_after(),
    ]

    try:
        await gather(*tasks, return_exceptions=True)
    except Exception as e:
        print(e)

My problem here, is that client.ws.get_data() and client.ws.get_news() do not receive messages often. And so it can take 24h before the await client.ws.method gets a message and thus the program stops, while I want it to stop after 60 seconds max, whether the tasks have finished or not.

How could I do this?


Solution

  • You can either wrap each task in an asyncio.wait_for method, so that each has a timeout - or, instead of .gather, use asyncio.wait: https://docs.python.org/3/library/asyncio-task.html#asyncio.wait - it even has an optional timeout argument. The only thing is that you'd better wrap it in a looping pattern, if you want to retry some of the not-completed tasks. (but otherwise, if you will just handle all not-done tasks in the same way, for example, passing then to a long-running manager task, just one call will do)

    async def main():
        tasks = [
            watch_task1(),
            watch_task2(),
            stop_after(),
        ]
    
        
        done, waiting = await asyncio.wait(tasks, timeout=60, return_when=ALL_COMPLETED)
        for task in done:
            try:
                # but check the docs, there are better ways to check for exceptions. I am just transposing your code
                task.result()
            except Exception as e:
                print(e)
        # at this point you have the uncompleted tasks in the `waiting` set, and can do whatever you want to them.
        ...
        for task in waiting:
           task.cancel()