I am using asyncio.StreamReader to continuously process line based communication like this:
reader, writer = await asyncio.open_connection(host, port)
while running:
line = await reader.readline()
process(line)
The call to readline() blocks till a line is received. I want to be able to abort the blocking, so I wrapped the call into a Task:
reader, writer = await asyncio.open_connection(host, port)
while running:
read_task = loop.create_task(reader.readline())
await read_task
line = read_task.result()
process(line)
Such that I can call from somewhere else
read_task.cancel()
to stop the blocking readline() call. But this does not work.
How can I abort or cancel a blocking await asyncio.StreamReader.readline() call?
import asyncio
import socket
import threading
import time
read_task: asyncio.Task = None
async def wait_for_data():
global read_task
loop = asyncio.get_running_loop()
# Create a pair of connected sockets.
rsock, wsock = socket.socketpair()
reader, writer = await asyncio.open_connection(sock=rsock)
# Creat a task to call blocking readline
read_task = loop.create_task(reader.readline())
print("Waiting for line...")
await read_task
data = read_task.result()
# ==== Never get here ====
print("Received:", data.decode())
writer.close()
await writer.wait_closed()
# Close the second socket
wsock.close()
def thread_entry():
asyncio.run(wait_for_data())
if __name__ == '__main__':
thread = threading.Thread(target=thread_entry)
thread.start()
time.sleep(2) # Give the thread some time to start
print("Canceling read task")
read_task.cancel() # Cancel the read task to simulate a timeout
thread.join()
print("Done.")
One (IMHO ugly) workaround is to use asyncio.wait_for and use a timeout:
reader, writer = await asyncio.open_connection(host, port)
while running:
try:
line = await asyncio.wait_for(reader.readline(), timeout=5)
except asyncio.TimeoutError:
if not running: # Redundant but for clarity
break
continue
process(line)
You are attempting to cancel a task from a thread other than the one in which the task's event loop is running. This will not work! Instead, use asyncio.run_coroutine_threadsafe
in the parent thread to have a coroutine run in the child thread's event loop (which needs to be made accessible to the parent thread). Also, rather than sleeping 2 seconds to ensure that the read task has been created, we should use a threading.Event
instance:
import asyncio
import socket
import threading
read_task: asyncio.Task = None
read_task_created = threading.Event()
async def wait_for_data():
global read_task, loop
loop = asyncio.get_running_loop()
# Create a pair of connected sockets.
rsock, wsock = socket.socketpair()
reader, writer = await asyncio.open_connection(sock=rsock)
# Creat a task to call blocking readline and signal its creation:
read_task = loop.create_task(reader.readline())
read_task_created.set()
print("Waiting for line...")
try:
await read_task
except asyncio.CancelledError:
print('read task canceled (1)')
else:
# If read completes normally:
data = read_task.result()
print("Received:", data.decode())
writer.close()
await writer.wait_closed()
# Close the second socket
wsock.close()
def thread_entry():
asyncio.run(wait_for_data())
if __name__ == '__main__':
thread = threading.Thread(target=thread_entry)
thread.start()
read_task_created.wait() # Wait for read task to start
async def cancel_task():
read_task.cancel()
try:
await read_task
except asyncio.CancelledError:
print('read task canceled (2)')
print("Canceling read task")
future = asyncio.run_coroutine_threadsafe(cancel_task(), loop)
future.result()
thread.join()
print("Done.")
Prints:
Waiting for line...
Canceling read task
read task canceled (1)
read task canceled (2)
Done.
Note
Function cancel_task
is not required to await
the canceled task and can instead simply be:
...
async def cancel_task():
read_task.cancel()
...