I am using Python 3.11.5 with the below code:
import asyncio
from collections.abc import AsyncIterable
# Leave this iterable be, the question is about
# how to use many instances of this in parallel
async def iterable() -> AsyncIterable[int]:
yield 1
yield 2
yield 3
# How can one get multiple async iterables to work with asyncio.gather?
# In other words, since asyncio.gather works with asyncio.Task,
# (1) How can one convert an async iterable to a Task?
# (2) How can one use asyncio.gather to run many of these tasks in parallel,
# keeping the results 1-1 with the source iterables?
results_1, results_2, results_3 = asyncio.gather(iterable(), iterable(), iterable())
To restate the question, how can one get:
AsyncIterable
as an asyncio
task, where the task iterates until exhaustion(e.g. for use with asyncio.gather
)?
I am looking for a 1 - 3 line snippet showing how to connect these dots.
According your code snippet, you're trying to pass an async generator function (iterable) directly to asyncio.gather
, however, it expects awaitables (coroutines, Tasks, or Futures). So, to fix the issue, one possible solution is, to create a new coroutine that consumes the async iterable()
s and collects their items in the lists.
The code will be like this:
async def collect(async_iterable):
return [item async for item in async_iterable]
results = asyncio.run(asyncio.gather(
asyncio.create_task(collect(iterable())),
asyncio.create_task(collect(iterable())),
asyncio.create_task(collect(iterable()))
))
In this way, you would have three [1,2,3]
lists, but still, there is a problem — you haven't gone through a concurrency manner among the iterable()
s. I mean by adding a print
within the collect()
you see they are running sequentially. Therefore, to have an asynchronous behaviour between your iterables you need at least an await
statement within the generator to make it as an awaitable coroutine used for switching. Here we can use await asyncio.sleep(0)
as a trick!
Here's the whole code:
import asyncio
from collections.abc import AsyncIterable
async def iterable() -> AsyncIterable[int]:
yield 1
await asyncio.sleep(0)
yield 2
await asyncio.sleep(0)
yield 3
async def collect(async_iterable: AsyncIterable[int]) -> list:
result = []
async for item in async_iterable:
print(item)
result.append(item)
return result
async def main():
tasks = [
asyncio.create_task(collect(iterable())),
asyncio.create_task(collect(iterable())),
asyncio.create_task(collect(iterable()))
]
results = await asyncio.gather(*tasks)
return results
results_1, results_2, results_3 = asyncio.run(main())
print(results_1, results_2, results_3)
Out:
1
1
1
2
2
2
3
3
3
[1, 2, 3] [1, 2, 3] [1, 2, 3]