pythonpython-3.xpython-asyncio

How to flatten a bunch of asynchronous generators asynchronously?


I have a bunch of webpages to scrape, these webpages have addresses that differ only in page number thus can be processed in parallel using aiohttp.

Now I am using an asynchronous function to process these webpages, each call takes one address as argument and returns a flat list of strings. I am passing these urls all at once, I want a flat list of all the strings from each function call, I don't care about the order of these strings, I want a string as soon as it is yielded, regardless of whether other function calls have completed, and I don't want to concatenate the results.

I just can't make it work.

This is a Minimal Reproducible Example that illustrates the same problem:

import asyncio

async def test(n):
    await asyncio.sleep(0.5)
    for i in range(1, 11):
        yield n * i

async def run_test():
    ls = []
    for i in range(10):
     async for j in test(i):
         ls.append(j)
    return ls

asyncio.run(run_test())

The above code runs, but doesn't produce the expected result. It waits 5 seconds instead of 0.5 seconds, and every time I run it the output is the same.

I have tried this:

async def run_test():
    ls = []
    for t in asyncio.as_completed([test(i) for i in range(10)]):
        for i in await t:
            ls.append(i)
    return ls

But it also doesn't work:

TypeError: An asyncio.Future, a coroutine or an awaitable is required

This doesn't work, either:

import asyncio

async def test(n):
    await asyncio.sleep(0.5)
    for i in range(1, 11):
        yield n * i
        
async def run_test():
    ls = []
    for x in await asyncio.gather(*(test(i) for i in range(10))):
        for j in x:
            ls.append(j)
    return ls

asyncio.run(run_test())
TypeError: An asyncio.Future, a coroutine or an awaitable is required

I know I can do it like this:

import asyncio

async def test(n):
    await asyncio.sleep(0.5)
    return [n * i for i in range(1, 11)]

async def run_test():
    ls = []
    for x in asyncio.as_completed([test(i) for i in range(10)]):
        ls.extend(await x)
    return ls

asyncio.run(run_test())

But as I specifically stated above, I want to use asynchronous generators.

So how can I yield from asynchronous generators concurrently?


Perhaps my wording isn't specific enough.

I meant that in the first example every time I ran run_test() it outputs:

[

     0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
     1,  2,  3,  4,  5,  6,  7,  8,  9, 10,
     2,  4,  6,  8, 10, 12, 14, 16, 18, 20,
     3,  6,  9, 12, 15, 18, 21, 24, 27, 30,
     4,  8, 12, 16, 20, 24, 28, 32, 36, 40,
     5, 10, 15, 20, 25, 30, 35, 40, 45, 50,
     6, 12, 18, 24, 30, 36, 42, 48, 54, 60,
     7, 14, 21, 28, 35, 42, 49, 56, 63, 70,
     8, 16, 24, 32, 40, 48, 56, 64, 72, 80,
     9, 18, 27, 36, 45, 54, 63, 72, 81, 90,
]

And that would be the expected result if the code were to run synchronously.

I guess people assumed that I wanted numbers from 0 to 99, I don't know what gave people that idea.

Of course I can do this:

[
    10 * i + j
    for i in range(10)
    for j in range(10)
]

But why would I use that over list(range(100))?

The point is the output of each function doesn't matter, I just want to collect the entries as soon as they become available.

This is a slightly more complicated example, it gives a different output each time it is run, it is synchronous of course, but it demonstrates what I wanted to achieve asynchronously:

import random

def test(n):
    for i in range(1, 11):
        yield n * i


def run_test():
    gens = [test(i) for i in range(10)]
    ls = []
    while gens:
        gen = random.choice(gens)
        try:
            ls.append(next(gen))
        except StopIteration:
            gens.remove(gen)
    return ls

run_test()

Solution

  • The thing is that what has to be driven in parallel is the asynchronous yielding of each item - so, using calls like asyncio.gather / .wait / . as_completed are ineffective: these await whole tasks in parallel.

    The async for command can't do it alone: it will await for each element in a single generator, but in the code as you had presented it, only after one generator is exhausted the next async for, for the next task, will start running.

    The easiest way to do it is to wrap each async for block in its own task - for that, it needs to be contained in an async function - it can be a nested function, in a way you can still access and use the variables in the outer run_test (in this case, appending results to the ls list). This inner async function then is called and set as a task to run in parallel, and any of .gather/.wait/.as_cmpleted can be used on them. I've used a TaskGroup in the example bellow - if you are on Python 3.11 or above, it should be preferable to gather (but it is only usable if none of the tasks will ever raise an exception, otherwise the TaskGroup is cancelled as a whole).

    (I also added some time jitter between each element inside the generator):

    import random
    import asyncio
    
    async def test(n):
        await asyncio.sleep(0.5)
        for i in range(10):
            yield 10 * n + i
            await asyncio.sleep(random.random() / 10)
    
    async def run_test():
        ls = []
        tasks = set()
        async with asyncio.TaskGroup() as tg:
            for i in range(10):
                async def mid_task(i):
                    async for j in test(i):
                        ls.append(j)
                tasks.add(tg.create_task(mid_task(i)))
    
        return ls
    
    print(asyncio.run(run_test()))
    

    And the result of running this is:

    $ time python xizze.py 
    [0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 21, 31, 1, 51, 91, 22, 92, 61, 11, 71, 41, 2, 3, 81, 52, 32, 23, 42, 43, 12, 72, 53, 4, 33, 24, 93, 73, 34, 82, 44, 62, 83, 63, 13, 35, 5, 14, 54, 74, 45, 75, 64, 94, 25, 15, 84, 85, 65, 76, 6, 36, 77, 16, 46, 55, 86, 7, 47, 95, 26, 27, 87, 96, 56, 66, 37, 57, 97, 17, 28, 38, 78, 58, 88, 8, 18, 48, 19, 67, 39, 49, 29, 89, 79, 98, 68, 9, 69, 59, 99]
    
    real    0m1.157s
    

    (the 0.5s at the start of each generator, + 0.05s on average between each element in each generator).

    Without wrapping the generator consumption in a small function that can itself be a task, the other way to do it is calling and awaitng anext on each of the generators until they are exhausted - this is possible, but IMO, will just add more complexity. Note that if you don't want a nested function, just declare it before run_test and pass ls to it as a parameter.

    without wrapping the async for in a function:

    Mostly for illustrative purposes, this is an example code that opts for driving the concurrent generators "manually" with anext calls, instead of creating concurrent tasks which wrap the iteration in each generator - as you can see, the code gets much more complex and error prone - and since the complexity is inside a function, it is a complex logic that is very hard to unit-test (this version only came out after 2 or 3 iterations on previous, incorrect attempts)

    import random
    import asyncio
    
    async def test(n):
        await asyncio.sleep(0.5)
        for i in range(10):
            yield 10 * n + i
            await asyncio.sleep(random.random() / 10)
    
    _SENTINEL = object()
    
    async def run_test():
        ls = []
        tasks = set()
        generators = {test(i) for i in range(10)}
        ready_generators = generators.copy()
        first = True
        while tasks or ready_generators:
            first = False
            for gen in ready_generators:
                task = asyncio.create_task(anext(gen, _SENTINEL))
                # when a task produces a result from a generator, we must know from WHICH generator, so that we can create a task to await its next result.
                # Fortunately, Python just allows us to add extraneous attributes to a task instance - but strict, type checking tools for annotated code, will scream with this:
                task.generator = gen
                tasks.add(task)
    
            done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
            ready_generators = set()
            for task in done:
                result = task.result()
                if result is _SENTINEL:
                    continue
                ready_generators.add(task.generator)
                ls.append(result)
            tasks = pending
        return ls
    
    
    print(asyncio.run(run_test()))
    

    (It will produce the same results, including time-wise - as the code above)