pythonparallel-processingpython-asyncio

How to process tasks as they complete when using TaskGroup?


I understand the arguments for using the newer TaskGroup in place of older mechanisms based on create_task().

However, TaskGroup exits after all tasks are done. What if you want to start processing the results as soon as the first task finishes, like you can with asyncio.as_completed() or asyncio.wait( ..., return_when=FIRST_COMPLETED )?

If TaskGroup offers no way to do that, then it's a trade-off rather than a strict upgrade, isn't?


Solution

  • A callback based solution:

    import asyncio
    
    def task_done(task):
        try:
            value = task.result()
            msg = f"finished, return value: {value}"
        except asyncio.CancelledError as err:
            msg = "cancelled"
        except Exception as err:
            msg = f"failed: {repr(err)}"
        print(f"{task.get_name()} {msg}")
    
    async def coro(i):
        await asyncio.sleep(i)
        if i == 4:
            1/0 
        return 100 + i 
    
    async def main():
        try:
            async with asyncio.TaskGroup() as tg: 
                for i in range(1, 10):
                    task = tg.create_task(coro(i), name=f"task #{i}")
                    task.add_done_callback(task_done)
        except ExceptionGroup as group:
            print(f"{len(group.exceptions)} exception(s) in the task group")
    
    if __name__ == '__main__':
        asyncio.run(main())
    

    Total output:

    task #1 finished, return value: 101
    task #2 finished, return value: 102
    task #3 finished, return value: 103
    task #4 failed: ZeroDivisionError('division by zero')
    task #8 cancelled
    task #9 cancelled
    task #7 cancelled
    task #6 cancelled
    task #5 cancelled
    1 exception(s) in the task group