python-asynciopython-anyio

Asyncio as_completed in AnyIO


How can I implement the functionality of asyncio as_completed on anyio?

I have a messagebus that picks up a user command and directs it to the appropriate handler. That may generate a new domain event that should be picked up by the bus too. Using asyncio.as_completed I can run multiple tasks concurrently, get the results of each task as they complete, check if a new event was generated, and then handle this new event. I would like to use anyio, but don't know how.

This is kinda of what I am doing with asyncio:

import asyncio
import itertools

import anyio


async def handle(event: str):
    await handle_event(event)


async def handle_event(event: str):
    if event == "event":
        coros = [slow_2(), slow_5()]
    else:
        coros = [slow_1(), slow_1()]
    for coro in asyncio.as_completed(coros):
        result = await coro
        new_events = []
        if result == "event":
            new_events.append(["", ""])
        if new_events:
            async with anyio.create_task_group() as tg:
                for event in new_events:
                    tg.start_soon(handle_event, event)


async def spin(msg: str) -> None:
    for char in itertools.cycle(r"\|/-"):
        status = f"\r{char} {msg}"
        print(status, flush=True, end="")
        try:
            await anyio.sleep(0.1)
        except Exception:
            break
    blanks = " " * len(status)
    print(f"\r{blanks}\r", end="")


async def slow_1():
    await anyio.sleep(1)
    print("slow_1")


async def slow_2():
    await anyio.sleep(2)
    print("slow_2")
    return "event"


async def slow_5():
    await anyio.sleep(5)
    print("slow_5")


async def supervisor():
    async with anyio.create_task_group() as tg:
        with anyio.CancelScope(shield=True) as scope:
            tg.start_soon(spin, "thinking!")
            await handle("event")
            tg.cancel_scope.cancel()


if __name__ == "__main__":
    anyio.run(supervisor)

Solution

  • There are a few ways you could do this, but here's one that should have almost the same API:

    from collections.abc import Awaitable, Iterable
    from typing import TypeVar
    
    import anyio
    from anyio import create_memory_object_stream
    from anyio.abc import TaskGroup
    
    T = TypeVar("T")
    
    
    def as_completed(tg: TaskGroup, aws: Iterable[Awaitable[T]]) -> Iterable[Awaitable[T]]:
      send_stream, receive_stream = create_memory_object_stream()
    
      async def populate_result(a: Awaitable[T]):
        await send_stream.send(await a)
    
      async def wait_for_result() -> T:
        return await receive_stream.receive()
    
      for a in aws:
        tg.start_soon(populate_result, a)
    
      return (wait_for_result() for _ in aws)
    
    async def main():
      async with anyio.create_task_group() as tg:
        coroutines = [slow_1(), slow_2(), slow_3()]
        for coroutine in as_completed(tg, coroutines):
          result = await coroutine
          # do stuff with result
    
    anyio.run(main)
    

    If you don't mind changing the API slightly, we can simplify a bit more:

    from collections.abc import Awaitable, Iterable, AsyncIterable
    from typing import TypeVar
    
    import anyio
    from anyio import create_memory_object_stream
    from anyio.abc import TaskGroup
    
    T = TypeVar("T")
    
    
    def as_completed(tg: TaskGroup, aws: Iterable[Awaitable[T]]) -> AsyncIterable[Awaitable[T]]:
      send_stream, receive_stream = create_memory_object_stream()
    
      async def populate_result(a: Awaitable[T]):
        await send_stream.send(await a)
    
      for a in aws:
        tg.start_soon(populate_result, a)
    
      return receive_stream
    
    async def main():
      async with anyio.create_task_group() as tg:
        coroutines = [slow_1(), slow_2(), slow_3()]
        async for result in as_completed(tg, coroutines):
          # do stuff with result
    
    anyio.run(main)
    

    I chose to use a TypeVar named T everywhere, but you could consider using Any instead. This would mean you can use this with mixed coroutine types.

    DISCLAIMER: I haven't actually run this code, but the approach should work just fine with minor modifications if necessary.