pythonsubprocesspython-asyncio

Monitor `asyncio.create_subprocess_exec` pipes for errors


I am trying to pipe multiple Linux commands and abort if there is an error. With Popen the communicate() method waits for all commands to finish. This is why I am trying asyncio now.

I have the following MWE working as expected if there is no error:

async def forward(src, dest):
    """Read data from src and write it to dest."""
    while True:
        chunk = await src.read(4096)
        if not chunk:
            dest.write_eof()
            await dest.drain()
            break
        dest.write(chunk)
        await dest.drain()

async def stderr_watch(stream):
    err = await stream.read()
    if err.strip():
        raise RuntimeError(f"stderr: {err.decode()}")

async def main():
    p1 = await asyncio.create_subprocess_exec(
        "find","/", "-name", "*.py",
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
    )

    p2 = await asyncio.create_subprocess_exec(
        "wc", "-l",
        stdin=asyncio.subprocess.PIPE,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
    )

    output = []
    async def stream_reader(stream):
        while True:
            line = await stream.readline()
            if not line:
                break
            output.append(line.decode())


    try:
        async with asyncio.TaskGroup() as tg:
            t1 = tg.create_task(stderr_watch(p1.stderr))
            t2 = tg.create_task(stderr_watch(p2.stderr))
            t3 = tg.create_task(forward(p1.stdout, p2.stdin))
            t4 = tg.create_task(stream_reader(p2.stdout))

    except* Exception as eg:
        for e in eg.exceptions:
            print(e)
            pass
    else:
        return "".join(output)



if __name__ == '__main__':
    output = asyncio.run(main())
    print(output)

However, I am getting a RuntimeError: Event loop is closed if I create an exeption in p2, eg. by piping to "wc", "-l", "abc",. Where is my mistake?


Solution

  • The error occurs because the finalizer of the internal object runs after the event loop is finished. (It's the asyncio.base_subprocess.BaseSubprocessTransport.) You need to terminate the child processes on error, and await on them, like this.

    async def main():
        p1 = await asyncio.create_subprocess_exec(
            ...
        )
    
        p2 = await asyncio.create_subprocess_exec(
            ...
        )
        ...
        try:
            async with asyncio.TaskGroup() as tg:
            ...
        except* Exception as eg:
            ...
            for p in (p1, p2):
                try:
                    p.terminate()
                except Exception:
                    pass
        for p in (p1, p2):
            await p.communicate()