I have a use-case where there are a lot of TCP/IP clients being bundled up into a single Python process. The current desire is to use asyncio
to provide concurrency for the program. For all new TCP/IP clients, asyncio
streams will be used, and for all new HTTP clients, aiohttp
will be used.
However, there are several existing clients that are written using Python's socket
module. My question is: how should you "wrap" the existing socket
-based classes and methods with async
?
Example existing TCP/IP client:
import socket
class ExistingClient:
def __init__(self, host: str, port: int) -> None:
self.__host = host
self.__port = port
self.__socket: socket.socket | None = None
def initialize(self) -> None:
self.__socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.__socket.connect((self.__host, self.__port))
def get_status(self) -> int:
self.__socket.send("2\n".encode())
data: str = str(self.__socket.recv(1024).decode()).strip()
return int(data)
def close(self) -> None:
self.__socket.close()
You can run this server script to serve as an example for the below client script. Save in server.py
.
import asyncio
import functools
class State:
def __init__(self) -> None:
self.__count: int = 0
def add_to_count(self, count: int) -> None:
self.__count = self.__count + count
@property
def count(self) -> int:
return self.__count
async def handle_echo(
reader: asyncio.StreamReader, writer: asyncio.StreamWriter, state: State
):
# Ignore the use of the infinite while loop for this example.
# Controlling the loop would be handled in a more sophisticated way.
while True:
data = await reader.readline()
message: int = int(data.decode())
state.add_to_count(message)
addr = writer.get_extra_info("peername")
writer.write(f"{state.count}\n".encode())
await writer.drain()
writer.close()
await writer.wait_closed()
async def server(port: int):
state = State()
partial = functools.partial(handle_echo, state=state)
server = await asyncio.start_server(partial, "127.0.0.1", port)
address = ", ".join(str(sock.getsockname()) for sock in server.sockets)
print(f"Serving on {address}")
async with server:
await server.serve_forever()
async def main():
await asyncio.gather(
server(8888),
server(8889),
)
asyncio.run(main())
To me, one solution seems to be that the existing class(es) could be editing with async
methods that simply defer down to the blocking I/O socket
calls. For example:
import socket
class ExistingClient:
def __init__(self, host: str, port: int) -> None:
self.__host = host
self.__port = port
self.__socket: socket.socket | None = None
def initialize(self) -> None:
self.__socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.__socket.connect((self.__host, self.__port))
async def async_initialize(self) -> None:
self.initialize()
def get_status(self) -> int:
self.__socket.send("2\n".encode())
data: str = str(self.__socket.recv(1024).decode()).strip()
return int(data)
async def async_get_status(self) -> int:
return self.get_status()
def close(self) -> None:
self.__socket.close()
async def async_close(self) -> None:
self.close()
Does this work in the sense of not blocking the asyncio
event loop such that it behaves very similarly to asyncio
streams calls? As an example of how I envision something like this getting integrated with normal asyncio
code (saved in the same file as the directly above modified ExistingClient
with the async
methods in say client.py
):
import asyncio
async def streams_tcp_client():
reader, writer = await asyncio.open_connection("127.0.0.1", 8888)
writer.write("1\n".encode())
await writer.drain()
data: bytes = await reader.readline()
print(f"asyncio streams data: {data.decode().strip()}")
writer.close()
await writer.wait_closed()
async def existing_tcp_client1():
existing_client = ExistingClient("127.0.0.1", 8889)
await existing_client.async_initialize()
data: int = await existing_client.async_get_status()
print(f"Existing client data: {data}")
await existing_client.async_close()
async def main():
await asyncio.gather(
streams_tcp_client(),
existing_tcp_client1(),
)
Does this work as one might expect it to such that the ExistingClient
async
calls, that contain blocking I/O calls, do not block the asyncio
event loop?
I have ran this code, and it runs and prints out the expected data. But it is unclear how to test if the event loop is running as expected or desired.
I have seen some mention of asyncio.to_thread
. In the documentation, it states:
This coroutine function is primarily intended to be used for executing IO-bound functions/methods that would otherwise block the event loop if they were run in the main thread.
However, this does not really explain anything. And it doesn't explain why simply defining
async def async_blocking_io():
blocking_io()
is not enough to not block the event loop. Because the actual blocking I/O, such as a TCP/IP write and read operation, shouldn't block the event loop's thread, correct?
Would the way to use this be the following:
async def existing_tcp_client2():
existing_client = ExistingClient("127.0.0.1", 8889)
await asyncio.to_thread(existing_client.initialize)
data: int = await asyncio.to_thread(existing_client.get_status)
print(f"Existing client data with to_thread: {data}")
await asyncio.to_thread(existing_client.close)
This runs if I modify main
to be:
async def main():
await asyncio.gather(
streams_tcp_client(),
existing_tcp_client1(),
)
What is the difference between making async def async_<existing_method>
methods vs the asyncio.to_thread
method? The asyncio.to_thread
method is concerning, because each call would be ran in a new thread? That could be an issue for thread-unsafe classes and also creates overhead by constantly spawning new threads.
What are the other solutions to this problem?
Solution 1 - simply wrapping synchronous functions with an async
wrapper - will still block your main process. In a TCP server/client you could try to work around this by rewriting the lower level functions where you send/read data to/from the socket, making those lower level functions also async, processing the communication in smaller chunks. But if you don't do that, then solution 1 cannot give you more concurrency.
If you simply define
async def async_blocking_io():
blocking_io()
then that is going to block the process. Even if you add a formal await statement like this:
async def async_blocking_io():
await asyncio.sleep(0) # first gives other tasks a chance
blocking_io()
Now other co-routines might run first, but you're still going to block in the synchronous blocking_io()
call because the underlying TCP read and write operations are also synchronous and do block (if you put the socket in non-blocking mode, you'd need blocking select calls to know when data is available).
The asyncio.to_thread
function does run in a separate thread since this is the only way to prevent blocking (when using a simple high-level wrapper). So, if you call this often, you will indeed have lots of overhead, and this may defeat the purpose of using asyncio.
There is no easy way out. Either you select the convenience of asyncio.to_thread
(with it's overhead) or you go all the way in and rewrite all code, including the existing clients, using asyncio.streams
or you go half-way in by making the existing high-level synchronous code async (but this will only help if you can also make their lowest level functions -- communications with the sockets -- async).