pythonpython-3.xasynchronouspython-asynciosequence-generators

asynchronous python itertools chain multiple generators


UPDATED QUESTION FOR CLARITY:

suppose I have 2 processing generator functions:

def gen1(): # just for examples,
  yield 1   # yields actually carry 
  yield 2   # different computation weight 
  yield 3   # in my case

def gen2():
  yield 4
  yield 5
  yield 6

I can chain them with itertools

from itertools import chain

mix = chain(gen1(), gen2())

and then I can create another generator function object with it,

def mix_yield():
   for item in mix:
      yield item

or simply if I just want to next(mix), it's there.

My question is, how can I do the equivalent in asynchronous code?

Because I need it to:

PREV. UPDATE:

After experimenting and researching, I found aiostream library which states as async version of itertools, so what I did:

import asyncio
from aiostream import stream

async def gen1(): 
     await asyncio.sleep(0) 
     yield 1 
     await asyncio.sleep(0) 
     yield 2 
     await asyncio.sleep(0) 
     yield 3 

async def gen2(): 
     await asyncio.sleep(0) 
     yield 4 
     await asyncio.sleep(0) 
     yield 5 
     await asyncio.sleep(0) 
     yield 6 

a_mix = stream.combine.merge(gen1(),gen2())

async def a_mix_yield():
   for item in a_mix:
      yield item

but I still can't do next(a_mix)

TypeError: 'merge' object is not an iterator

or next(await a_mix)

raise StreamEmpty()

Although I still can make it into a list:

print(await stream.list(a_mix))
# [1, 2, 4, 3, 5, 6]

so one goal is completed, one more to go:


Solution

  • Python's next built-in function is just a convenient way of invoking the underlying __next__ method on the object. The async equivalent of __next__ is the __anext__ method on the async iterator. Prior to Python 3.10 there was no anext global function in the standard library (the aiostream library provided one), but one could easily write it:

    async def anext(aiterator):
        return await aiterator.__anext__()
    

    The async iterator is in turn obtained from an async iterable by calling the __aiter__ (in analogy to __iter__ provided by regular iterables). Async iteration driven manually looks like this:

    a_iterator = obj.__aiter__()          # regular method
    elem1 = await a_iterator.__anext__()  # async method
    elem2 = await a_iterator.__anext__()  # async method
    ...
    

    __anext__ will raise StopAsyncIteration when no more elements are available. To loop over async iterators one should use async for.

    Here is a runnable example, based on your code, using both __anext__ and async for to exhaust the stream set up with aiostream.stream.combine.merge:

    async def main():
        a_mix = stream.combine.merge(gen1(), gen2())
        async with a_mix.stream() as streamer:
            mix_iter = streamer.__aiter__()    
            print(await mix_iter.__anext__())
            print(await mix_iter.__anext__())
            print('remaining:')
            async for x in mix_iter:
                print(x)
    
    asyncio.get_event_loop().run_until_complete(main())