python-asyncio

Python: Get reference to original task after ordering tasks by completion


Question: After asyncio.as_completed yields a result, how do you get a reference to the original task?

Basically the same as this C# question, except in Python: Getting reference to original Task after ordering Tasks by completion?

Example Problem:

# Takes a list of WebClient objects,
# calls each one simultaneously,
# and yields the results immediately as they arrive
# to a synchronous caller.

def yieldThingsAsTheyArrive(webClients):

    tasks = []
    for webClient in webClients:
        # This is what we want to get a reference to later:
        task = webClient.fetch_thing()  # start long-running request asynchronously
        tasks.append(task)

    loop = asyncio.get_event_loop()
    for future in asyncio.as_completed(tasks):
        thing = loop.run_until_complete(future)  # since our caller is synchronous, wait until the task completes so we can yield the final result instead of a future
        thing.originalWebClient = ???  # This is where we need a reference to the original webClient
        yield thing

Solution

  • as_completed is specific in that it neither yields futures like asyncio.wait, nor their results like asyncio.gather. Instead, it yields coroutines that you need to await (in whatever way you like) to get the results in completion order. It cannot yield the futures you pass to it because at that point it doesn't yet know which of the passed futures will complete next.

    You can associate arbitrary data by wrapping the task in another future, whose result is the task object (to which you've attached your data). This is essentially equivalent to what the C# code does, only without the static-typing ceremony. Taking the setup from this answer, a runnable example looks like this:

    import asyncio
    
    async def first():
        await asyncio.sleep(5)
        return 'first'
    
    async def second():
        await asyncio.sleep(1)
        return 'second'
    
    async def third():
        await asyncio.sleep(3)
        return 'third'
    
    def ordinary_generator():
        loop = asyncio.get_event_loop()
    
        wrappers = []
        for idx, coro in enumerate((first(), second(), third())):
            task = loop.create_task(coro)
            task.idx = idx + 1
            # Wrap the task in a future that completes when the 
            # task does, but whose result is the task object itself.
            wrapper = loop.create_future()
            task.add_done_callback(wrapper.set_result)
            wrappers.append(wrapper)
    
        for x in asyncio.as_completed(wrappers):
            # yield completed tasks
            yield loop.run_until_complete(x)
    
    for task in ordinary_generator():
        print(task.result(), task.idx)
    

    The other option, which I would recommend, is to replace iteration over as_completed with a loop that calls asyncio.wait(return_when=FIRST_COMPLETED). This will also provide futures as they are complete, but without needing additional wrapping, and resulting in slightly more idiomatic asyncio code. We call ensure_future on each coroutine to convert it to a future, attach data to it, and only then pass it to asyncio.wait(). Since wait returns those same futures, the attached data is on them.

    def ordinary_generator():
        loop = asyncio.get_event_loop()
    
        pending = []
        for idx, coro in enumerate((first(), second(), third())):
            task = loop.create_task(coro)
            task.idx = idx + 1
            pending.append(task)
    
        while pending:
            done, pending = loop.run_until_complete(asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED))
            for task in done:
                yield task