I am receiving StopIteration error when iterating over the result of an aggregate using TaskGroups, but not otherwise. I have created a minimal example to show the error:
import asyncio
from motor.motor_asyncio import AsyncIOMotorClient
def create_aggregate():
client = AsyncIOMotorClient("mongodb://localhost:27017")
collection = client["test"]["test"]
return collection.aggregate([])
async def get_next(agg):
item = await agg.next()
print(item)
async def run_1_success():
agg = create_aggregate()
await get_next(agg)
async def run_2_success():
agg = create_aggregate()
await get_next(agg)
async with asyncio.TaskGroup() as tg:
tg.create_task(get_next(agg))
tg.create_task(get_next(agg))
async def run_3_error():
agg = create_aggregate()
async with asyncio.TaskGroup() as tg:
tg.create_task(get_next(agg))
tg.create_task(get_next(agg))
When running asyncio.run(run_1_success())
or asyncio.run(run_2_success())
I can successfully iterate over the trivial aggregation, even using TaskGroup. But when running asyncio.run(run_3_error())
I receive a StopAsyncIteration error.
Environment:
Thank you.
From the docs, for MotorCollection.aggregate
, emphasis mine:
Note that this method returns a
MotorCommandCursor
which lazily runs the aggregate command when first iterated. In order to run an aggregation with$out
or$merge
the application needs to iterate the cursor.
In run_2_success
, there is an initial await get_next(agg)
, which executes the aggregation and waits until the cursor is returned. So the cursor is available to be iterated over in the TaskGroups with
block.
But in run_3_error
, since both tasks have the same agg
aggregation object, it is not ready for next
- while the first one is still waiting for the cursor to be returned and the second one is already trying to iterate over the cursor (or vice-versa as the tasks execute).
If I add a await asyncio.sleep(1)
between the two tasks in run_3_error
, then it works.
Also, you don't need a get_next(agg)
method. The MotorCollection.aggregate
object
Returns a
MotorCommandCursor
that can be iterated like a cursor fromfind()
:
Side note: You should not be trying to iterate over the results of one cursor in parallel. If you want to do something asynchronously with the results, it should be something like:
async def do_something(res):
print(f"doing something with: {res["item"]}")
await asyncio.sleep(.1)
print(f"done with: {res["item"]}")
async def run_4_okay():
agg = create_aggregate()
async with asyncio.TaskGroup() as tg:
async for result in agg:
tg.create_task(do_something(result))
(Append those to a tasks
list to track results later)
Output, using this MongoDB docs collection :
doing something with: notebook
doing something with: paper
doing something with: postcard
done with: paper
done with: postcard
done with: notebook