pythongeneratorpython-asyncio

How to forcefully close an async generator?


Let's say I have an async generator like this:

async def event_publisher(connection, queue):
    while True:
        if not await connection.is_disconnected():
            event = await queue.get()
            yield event
        else:
            return

I consume it like this:

published_events = event_publisher(connection, queue)
async for event in published_events:
    # do event processing here

It works just fine, however when the connection is disconnected and there is no new event published the async for will just wait forever, so ideally I would like to close the generator forcefully like this:

if connection.is_disconnected():
    await published_events.aclose()

But I get the following error:

RuntimeError: aclose(): asynchronous generator is already running

Is there a way to stop processing of an already running generator?


Solution

  • It seems to be related to this issue. Noticable:

    As shown in https://gist.github.com/1st1/d9860cbf6fe2e5d243e695809aea674c, it's an error to close a synchronous generator while it is being iterated.

    ...

    In 3.8, calling "aclose()" can crash with a RuntimeError. It's no longer possible to reliably cancel a running asynchrounous generator.

    Well, since we can't cancel running asynchrounous generator, let's try to cancel its running.

    import asyncio
    from contextlib import suppress
    
    
    async def cancel_gen(agen):
        task = asyncio.create_task(agen.__anext__())
        task.cancel()
        with suppress(asyncio.CancelledError):
            await task
        await agen.aclose()  # probably a good idea, 
                             # but if you'll be getting errors, try to comment this line
    

    ...

    if connection.is_disconnected():
        await cancel_gen(published_events)
    

    Can't test if it'll work since you didn't provide reproducable example.