python-asyncionanomsgpynng

pynng: how to setup, and keep using, multiple Contexts on a REP0 socket


I'm working on a "server" thread, which takes care of some IO calls for a bunch of "clients".

The communication is done using pynng v0.5.0, the server has its own asyncio loop.

Each client "registers" by sending a first request, and then loops receiving the results and sending back READY messages.

On the server, the goal is to treat the first message of each client as a registration request, and to create a dedicated worker task which will loop doing IO stuff, sending the result and waiting for the READY message of that particular client.

To implement this, I'm trying to leverage the Context feature of REP0 sockets.

Side notes

Problem:

After a few iterations, some READY messages are intercepted by the registration coroutine of the server, instead of being routed to the proper worker task.

Since I can't share the code, I wrote a reproducer for my issue and included it below.

Worse, as you can see in the output, some result messages are sent to the wrong client (ERROR:root:<Worker 1>: worker/client mismatch, exiting.).

It looks like a bug, but I'm not entirely sure I understand how to use the contexts correctly, so any help would be appreciated.

Environment:

Code:

import asyncio
import logging
import pynng
import threading

NNG_DURATION_INFINITE = -1
ENDPOINT = 'inproc://example_endpoint'


class Server(threading.Thread):
    def __init__(self):
        super(Server, self).__init__()
        self._client_tasks = dict()

    @staticmethod
    async def _worker(ctx, client_id):
        while True:
            # Remember, the first 'receive' has already been done by self._new_client_handler()

            logging.debug(f"<Worker {client_id}>: doing some IO")
            await asyncio.sleep(1)

            logging.debug(f"<Worker {client_id}>: sending the result")
            # I already tried sending synchronously here instead, just in case the issue was related to that
            # (but it's not)
            await ctx.asend(f"result data for client {client_id}".encode())

            logging.debug(f"<Worker {client_id}>: waiting for client READY msg")
            data = await ctx.arecv()
            logging.debug(f"<Worker {client_id}>: received '{data}'")
            if data != bytes([client_id]):
                logging.error(f"<Worker {client_id}>: worker/client mismatch, exiting.")
                return

    async def _new_client_handler(self):
        with pynng.Rep0(listen=ENDPOINT) as socket:
            max_workers = 3 + 1  # Try setting it to 3 instead, to stop creating new contexts => now it works fine
            while await asyncio.sleep(0, result=True) and len(self._client_tasks) < max_workers:
                # The issue is here: at some point, the existing client READY messages get
                # intercepted here, instead of being routed to the proper worker context.
                # The intent here was to open a new context only for each *new* client, I was
                # assuming that a 'recv' on older worker contexts would take precedence.
                ctx = socket.new_context()
                data = await ctx.arecv()
                client_id = data[0]

                if client_id in self._client_tasks:
                    logging.error(f"<Server>: We already have a task for client {client_id}")
                    continue  # just let the client block on its 'recv' for now

                logging.debug(f"<Server>: New client : {client_id}")
                self._client_tasks[client_id] = asyncio.create_task(self._worker(ctx, client_id))

            await asyncio.gather(*list(self._client_tasks.values()))

    def run(self) -> None:
        # The "server" thread has its own asyncio loop
        asyncio.run(self._new_client_handler(), debug=True)


class Client(threading.Thread):
    def __init__(self, client_id: int):
        super(Client, self).__init__()
        self._id = client_id

    def __repr__(self):
        return f'<Client {self._id}>'

    def run(self):
        with pynng.Req0(dial=ENDPOINT, resend_time=NNG_DURATION_INFINITE) as socket:
            while True:
                logging.debug(f"{self}: READY")
                socket.send(bytes([self._id]))
                data_str = socket.recv().decode()
                logging.debug(f"{self}: received '{data_str}'")
                if data_str != f"result data for client {self._id}":
                    logging.error(f"{self}: client/worker mismatch, exiting.")
                    return


def main():
    logging.basicConfig(level=logging.DEBUG)
    threads = [Server(),
               *[Client(i) for i in range(3)]]
    for t in threads:
        t.start()
    for t in threads:
        t.join()


if __name__ == '__main__':
    main()

Output:

DEBUG:asyncio:Using proactor: IocpProactor
DEBUG:root:<Client 1>: READY
DEBUG:root:<Client 0>: READY
DEBUG:root:<Client 2>: READY
DEBUG:root:<Server>: New client : 1
DEBUG:root:<Worker 1>: doing some IO
DEBUG:root:<Server>: New client : 0
DEBUG:root:<Worker 0>: doing some IO
DEBUG:root:<Server>: New client : 2
DEBUG:root:<Worker 2>: doing some IO
DEBUG:root:<Worker 1>: sending the result
DEBUG:root:<Client 1>: received 'result data for client 1'
DEBUG:root:<Client 1>: READY
ERROR:root:<Server>: We already have a task for client 1
DEBUG:root:<Worker 1>: waiting for client READY msg
DEBUG:root:<Worker 0>: sending the result
DEBUG:root:<Client 0>: received 'result data for client 0'
DEBUG:root:<Client 0>: READY
DEBUG:root:<Worker 0>: waiting for client READY msg
DEBUG:root:<Worker 1>: received 'b'\x00''
ERROR:root:<Worker 1>: worker/client mismatch, exiting.
DEBUG:root:<Worker 2>: sending the result
DEBUG:root:<Client 2>: received 'result data for client 2'
DEBUG:root:<Client 2>: READY
DEBUG:root:<Worker 2>: waiting for client READY msg
ERROR:root:<Server>: We already have a task for client 2

Edit (2020-04-10): updated both pynng and the underlying nng.lib to their latest version (master branches), still the same issue.


Solution

  • After digging into the sources of both nng and pynng, and confirming my understanding with the maintainers, I can now answer my own question.

    When using a context on a REP0 socket, there are a few things to be aware of.

    As advertised, send/asend() is guaranteed to be routed to the same peer you last received from.

    The data from the next recv/arecv() on this same context, however, is NOT guaranteed to be coming from the same peer.

    Actually, the underlying nng call to rep0_ctx_recv() merely reads the next socket pipe with available data, so there's no guarantee that said data is coming from the same peer than the last recv/send pair.

    In the reproducer above, I was concurrently calling arecv() both on a new context (in the Server._new_client_handler() coroutine), and on each worker context (in the Server._worker() coroutine).

    So what I had previously described as the next request being "intercepted" by the main coroutine was merely a race condition.

    One solution would be to only receive from the Server._new_client_handler() coroutine, and have the workers only handle one request. Note that in this case, the workers are no longer dedicated to a particular peer. If this behavior is needed, the routing of incoming requests must be handled at application level.

    class Server(threading.Thread):
        @staticmethod
        async def _worker(ctx, data: bytes):
            client_id = int.from_bytes(data, byteorder='big', signed=False)
            logging.debug(f"<Worker {client_id}>: doing some IO")
            await asyncio.sleep(1 + 10 * random.random())
    
            logging.debug(f"<Worker {client_id}>: sending the result")
            await ctx.asend(f"result data for client {client_id}".encode())
    
        async def _new_client_handler(self):
            with pynng.Rep0(listen=ENDPOINT) as socket:
                while await asyncio.sleep(0, result=True):
                    ctx = socket.new_context()
                    data = await ctx.arecv()
                    asyncio.create_task(self._worker(ctx, data))
    
        def run(self) -> None:
            # The "server" thread has its own asyncio loop
            asyncio.run(self._new_client_handler(), debug=False)