python-3.xpython-asynciopython-multithreading

How to cancel an await asyncio.StreamReader.readline() call?


I am using asyncio.StreamReader to continuously process line based communication like this:

reader, writer = await asyncio.open_connection(host, port)
while running:
    line = await reader.readline()
    process(line)

The call to readline() blocks till a line is received. I want to be able to abort the blocking, so I wrapped the call into a Task:

reader, writer = await asyncio.open_connection(host, port)
while running:
    read_task = loop.create_task(reader.readline())
    await read_task
    line = read_task.result()
    process(line)

Such that I can call from somewhere else

read_task.cancel()

to stop the blocking readline() call. But this does not work.

The Question

How can I abort or cancel a blocking await asyncio.StreamReader.readline() call?

Minimal reproducible example

import asyncio
import socket
import threading
import time

read_task: asyncio.Task = None

async def wait_for_data():
    global read_task

    loop = asyncio.get_running_loop()

    # Create a pair of connected sockets.
    rsock, wsock = socket.socketpair()
    reader, writer = await asyncio.open_connection(sock=rsock)

    # Creat a task to call blocking readline
    read_task  = loop.create_task(reader.readline())
    print("Waiting for line...")
    await read_task
    data = read_task.result()

    # ==== Never get here ====
    print("Received:", data.decode())
    writer.close()
    await writer.wait_closed()

    # Close the second socket
    wsock.close()

def thread_entry():
    asyncio.run(wait_for_data())

if __name__ == '__main__':

    thread = threading.Thread(target=thread_entry)
    thread.start()  

    time.sleep(2)  # Give the thread some time to start
    print("Canceling read task")
    read_task.cancel()  # Cancel the read task to simulate a timeout

    thread.join()
    
    print("Done.")

Workaround

One (IMHO ugly) workaround is to use asyncio.wait_for and use a timeout:

reader, writer = await asyncio.open_connection(host, port)
while running:
    try:
        line = await asyncio.wait_for(reader.readline(), timeout=5)
    except asyncio.TimeoutError:
         if not running: # Redundant but for clarity
             break
         continue
    process(line)

Solution

  • You are attempting to cancel a task from a thread other than the one in which the task's event loop is running. This will not work! Instead, use asyncio.run_coroutine_threadsafe in the parent thread to have a coroutine run in the child thread's event loop (which needs to be made accessible to the parent thread). Also, rather than sleeping 2 seconds to ensure that the read task has been created, we should use a threading.Event instance:

    import asyncio
    import socket
    import threading
    
    read_task: asyncio.Task = None
    read_task_created = threading.Event()
    
    async def wait_for_data():
        global read_task, loop
    
        loop = asyncio.get_running_loop()
    
        # Create a pair of connected sockets.
        rsock, wsock = socket.socketpair()
        reader, writer = await asyncio.open_connection(sock=rsock)
    
        # Creat a task to call blocking readline and signal its creation:
        read_task  = loop.create_task(reader.readline())
        read_task_created.set()
    
        print("Waiting for line...")
        try:
            await read_task
        except asyncio.CancelledError:
            print('read task canceled (1)')
        else:
            # If read completes normally:
            data = read_task.result()
            print("Received:", data.decode())
    
        writer.close()
        await writer.wait_closed()
    
        # Close the second socket
        wsock.close()
    
    def thread_entry():
        asyncio.run(wait_for_data())
    
    if __name__ == '__main__':
    
        thread = threading.Thread(target=thread_entry)
        thread.start()
    
        read_task_created.wait() # Wait for read task to start
    
        async def cancel_task():
            read_task.cancel()
            try:
                await read_task
            except asyncio.CancelledError:
                print('read task canceled (2)')
    
        print("Canceling read task")
        future = asyncio.run_coroutine_threadsafe(cancel_task(), loop)
        future.result()
        thread.join()
    
        print("Done.")
    

    Prints:

    Waiting for line...
    Canceling read task
    read task canceled (1)
    read task canceled (2)
    Done.
    

    Note

    Function cancel_task is not required to await the canceled task and can instead simply be:

        ...
    
        async def cancel_task():
            read_task.cancel()
    
        ...