pythontcppython-asynciopython-sockets

Wrapping and integrating an existing Python `socket`-based class with `asyncio`


I have a use-case where there are a lot of TCP/IP clients being bundled up into a single Python process. The current desire is to use asyncio to provide concurrency for the program. For all new TCP/IP clients, asyncio streams will be used, and for all new HTTP clients, aiohttp will be used.

However, there are several existing clients that are written using Python's socket module. My question is: how should you "wrap" the existing socket-based classes and methods with async?


Example existing TCP/IP client:

import socket

class ExistingClient:
    def __init__(self, host: str, port: int) -> None:
        self.__host = host
        self.__port = port
        self.__socket: socket.socket | None = None

    def initialize(self) -> None:
        self.__socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.__socket.connect((self.__host, self.__port))

    def get_status(self) -> int:
        self.__socket.send("2\n".encode())
        data: str = str(self.__socket.recv(1024).decode()).strip()
        return int(data)

    def close(self) -> None:
        self.__socket.close()

Example TCP/IP server

You can run this server script to serve as an example for the below client script. Save in server.py.

import asyncio
import functools

class State:
    def __init__(self) -> None:
        self.__count: int = 0

    def add_to_count(self, count: int) -> None:
        self.__count = self.__count + count

    @property
    def count(self) -> int:
        return self.__count


async def handle_echo(
    reader: asyncio.StreamReader, writer: asyncio.StreamWriter, state: State
):
    # Ignore the use of the infinite while loop for this example.
    # Controlling the loop would be handled in a more sophisticated way.
    while True:
        data = await reader.readline()
        message: int = int(data.decode())
        state.add_to_count(message)

        addr = writer.get_extra_info("peername")

        writer.write(f"{state.count}\n".encode())
        await writer.drain()

    writer.close()
    await writer.wait_closed()


async def server(port: int):
    state = State()
    partial = functools.partial(handle_echo, state=state)
    server = await asyncio.start_server(partial, "127.0.0.1", port)

    address = ", ".join(str(sock.getsockname()) for sock in server.sockets)
    print(f"Serving on {address}")

    async with server:
        await server.serve_forever()


async def main():
    await asyncio.gather(
        server(8888),
        server(8889),
    )

asyncio.run(main())

Possible solution 1

To me, one solution seems to be that the existing class(es) could be editing with async methods that simply defer down to the blocking I/O socket calls. For example:

import socket

class ExistingClient:
    def __init__(self, host: str, port: int) -> None:
        self.__host = host
        self.__port = port
        self.__socket: socket.socket | None = None

    def initialize(self) -> None:
        self.__socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.__socket.connect((self.__host, self.__port))

    async def async_initialize(self) -> None:
        self.initialize()

    def get_status(self) -> int:
        self.__socket.send("2\n".encode())
        data: str = str(self.__socket.recv(1024).decode()).strip()
        return int(data)

    async def async_get_status(self) -> int:
        return self.get_status()

    def close(self) -> None:
        self.__socket.close()

    async def async_close(self) -> None:
        self.close()

Does this work in the sense of not blocking the asyncio event loop such that it behaves very similarly to asyncio streams calls? As an example of how I envision something like this getting integrated with normal asyncio code (saved in the same file as the directly above modified ExistingClient with the async methods in say client.py):

import asyncio

async def streams_tcp_client():
    reader, writer = await asyncio.open_connection("127.0.0.1", 8888)

    writer.write("1\n".encode())
    await writer.drain()

    data: bytes = await reader.readline()
    print(f"asyncio streams data: {data.decode().strip()}")

    writer.close()
    await writer.wait_closed()


async def existing_tcp_client1():
    existing_client = ExistingClient("127.0.0.1", 8889)
    await existing_client.async_initialize()

    data: int = await existing_client.async_get_status()
    print(f"Existing client data: {data}")

    await existing_client.async_close()


async def main():
    await asyncio.gather(
        streams_tcp_client(),
        existing_tcp_client1(),
    )

Does this work as one might expect it to such that the ExistingClient async calls, that contain blocking I/O calls, do not block the asyncio event loop?

I have ran this code, and it runs and prints out the expected data. But it is unclear how to test if the event loop is running as expected or desired.


Possible solution 2

I have seen some mention of asyncio.to_thread. In the documentation, it states:

This coroutine function is primarily intended to be used for executing IO-bound functions/methods that would otherwise block the event loop if they were run in the main thread.

However, this does not really explain anything. And it doesn't explain why simply defining

async def async_blocking_io():
    blocking_io()

is not enough to not block the event loop. Because the actual blocking I/O, such as a TCP/IP write and read operation, shouldn't block the event loop's thread, correct?

Would the way to use this be the following:

async def existing_tcp_client2():
    existing_client = ExistingClient("127.0.0.1", 8889)
    await asyncio.to_thread(existing_client.initialize)

    data: int = await asyncio.to_thread(existing_client.get_status)
    print(f"Existing client data with to_thread: {data}")

    await asyncio.to_thread(existing_client.close)

This runs if I modify main to be:

async def main():
    await asyncio.gather(
        streams_tcp_client(),
        existing_tcp_client1(),
    )

Final thoughts

What is the difference between making async def async_<existing_method> methods vs the asyncio.to_thread method? The asyncio.to_thread method is concerning, because each call would be ran in a new thread? That could be an issue for thread-unsafe classes and also creates overhead by constantly spawning new threads.

What are the other solutions to this problem?


Solution

  • Solution 1 - simply wrapping synchronous functions with an async wrapper - will still block your main process. In a TCP server/client you could try to work around this by rewriting the lower level functions where you send/read data to/from the socket, making those lower level functions also async, processing the communication in smaller chunks. But if you don't do that, then solution 1 cannot give you more concurrency.

    If you simply define

    async def async_blocking_io():
        blocking_io()
    

    then that is going to block the process. Even if you add a formal await statement like this:

    async def async_blocking_io():
        await asyncio.sleep(0) # first gives other tasks a chance
        blocking_io()
    

    Now other co-routines might run first, but you're still going to block in the synchronous blocking_io() call because the underlying TCP read and write operations are also synchronous and do block (if you put the socket in non-blocking mode, you'd need blocking select calls to know when data is available).

    The asyncio.to_thread function does run in a separate thread since this is the only way to prevent blocking (when using a simple high-level wrapper). So, if you call this often, you will indeed have lots of overhead, and this may defeat the purpose of using asyncio.

    There is no easy way out. Either you select the convenience of asyncio.to_thread (with it's overhead) or you go all the way in and rewrite all code, including the existing clients, using asyncio.streams or you go half-way in by making the existing high-level synchronous code async (but this will only help if you can also make their lowest level functions -- communications with the sockets -- async).