UPDATED QUESTION FOR CLARITY:
suppose I have 2 processing generator functions:
def gen1(): # just for examples,
yield 1 # yields actually carry
yield 2 # different computation weight
yield 3 # in my case
def gen2():
yield 4
yield 5
yield 6
I can chain them with itertools
from itertools import chain
mix = chain(gen1(), gen2())
and then I can create another generator function object with it,
def mix_yield():
for item in mix:
yield item
or simply if I just want to next(mix), it's there.
My question is, how can I do the equivalent in asynchronous code?
Because I need it to:
next iteratorPREV. UPDATE:
After experimenting and researching, I found aiostream library which states as async version of itertools, so what I did:
import asyncio
from aiostream import stream
async def gen1():
await asyncio.sleep(0)
yield 1
await asyncio.sleep(0)
yield 2
await asyncio.sleep(0)
yield 3
async def gen2():
await asyncio.sleep(0)
yield 4
await asyncio.sleep(0)
yield 5
await asyncio.sleep(0)
yield 6
a_mix = stream.combine.merge(gen1(),gen2())
async def a_mix_yield():
for item in a_mix:
yield item
but I still can't do next(a_mix)
TypeError: 'merge' object is not an iterator
or next(await a_mix)
raise StreamEmpty()
Although I still can make it into a list:
print(await stream.list(a_mix))
# [1, 2, 4, 3, 5, 6]
so one goal is completed, one more to go:
return in yield (one by one), or with next iterator
- the fastest resolved yield first (async)
Python's next built-in function is just a convenient way of invoking the underlying __next__ method on the object. The async equivalent of __next__ is the __anext__ method on the async iterator. Prior to Python 3.10 there was no anext global function in the standard library (the aiostream library provided one), but one could easily write it:
async def anext(aiterator):
return await aiterator.__anext__()
The async iterator is in turn obtained from an async iterable by calling the __aiter__ (in analogy to __iter__ provided by regular iterables). Async iteration driven manually looks like this:
a_iterator = obj.__aiter__() # regular method
elem1 = await a_iterator.__anext__() # async method
elem2 = await a_iterator.__anext__() # async method
...
__anext__ will raise StopAsyncIteration when no more elements are available. To loop over async iterators one should use async for.
Here is a runnable example, based on your code, using both __anext__ and async for to exhaust the stream set up with aiostream.stream.combine.merge:
async def main():
a_mix = stream.combine.merge(gen1(), gen2())
async with a_mix.stream() as streamer:
mix_iter = streamer.__aiter__()
print(await mix_iter.__anext__())
print(await mix_iter.__anext__())
print('remaining:')
async for x in mix_iter:
print(x)
asyncio.get_event_loop().run_until_complete(main())