pythonsocketsservertcppynng

Making pynng and socket talk to each other


TL;DR

I spin up a server using pynng, then a client from Python Standard Library socket will try to send messages to it.

The problem is that client can send the message, but server is oblivious to it. Therefore, it doesn't work.

Am I missing something? Some low-level protocol setting? Some termination character?

The reason why I'm doing this is that I will build a Python script that uses pynng to act as a server. Then a non-Python program (which I assume has knowledge of basic TCP protocols) will try to talk with this Python server. Thus I am using the IMHO most primitive socket library I could operate, the socket module in the standard library.

The details

I will present code snippets as I discuss, but I will show the full minimal code example at the end.

I am trying to spin up a server using pynng

def server():
    with pynng.Pair0(listen=f'tcp://{HOST:s}:{PORT:d}', recv_timeout=10000) as s:
        print("Server running")
        data = s.recv()  # Blocks forever here
        print(data)

Then, client that looks like this will try to connect to it:

def client():
    with socket.create_connection(address=(HOST, PORT), timeout=5) as s:
        print("Client connected")
        s.sendall(b'Hello world')
        print("Client sent message")

I put them all together in using threading:

def main():
    srv = threading.Thread(target=server)
    cli = threading.Thread(target=client)

    srv.start()
    cli.start()

    srv.join()
    cli.join()

Minimum working code

All told, this is the minimum working code:

import socket
import pynng
import threading

HOST = "127.0.0.1"
PORT = 65432

def main():
    srv = threading.Thread(target=server)
    cli = threading.Thread(target=client)

    srv.start()
    cli.start()

    srv.join()
    cli.join()

def server():
    with pynng.Pair0(listen=f'tcp://{HOST:s}:{PORT:d}', recv_timeout=10000) as s:
        print("Server running")
        data = s.recv()  # Blocks forever here
        print("Message received")
        print(data)

def client():
    with socket.create_connection(address=(HOST, PORT), timeout=5) as s:
        print("Client connected")
        s.sendall(b'Hello world')
        print("Client sent message")


if __name__ == "__main__":
    main()

Then I run this in the terminal

$ python main.py

It seems that the server is unable to recv messages, and the recv attempt thus times out at 10000ms.

Server running
Client connected
Client sent message
Exception in thread Thread-1:
Traceback (most recent call last):
  File "/home/kmonisit/miniconda3/envs/engg/lib/python3.8/threading.py", line 932, in _bootstrap_inner
    self.run()
  File "/home/kmonisit/miniconda3/envs/engg/lib/python3.8/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "main.py", line 39, in server
    data = s.recv()  # Blocks forever here
  File "/home/kmonisit/miniconda3/envs/engg/lib/python3.8/site-packages/pynng/nng.py", line 454, in recv
    check_err(ret)
  File "/home/kmonisit/miniconda3/envs/engg/lib/python3.8/site-packages/pynng/exceptions.py", line 201, in check_err
    raise exc(string, err)
pynng.exceptions.Timeout: Timed out

Solution

  • pynng is based on Nanomsg Next Generation, which is an implementation of the Scalability Protocols. The scalability protocols work on many different transports, including tcp, but bare sockets are not compatible. However, with a little bit of prayer and elbow grease, they can be made compatible. Which is too say, you can implement the scalability protocols in pure Python if need be.

    First, we need to know what the wire format is; thankfully that is documented in an RFC in the original nanomsg repository. An implementation of a Pair0 client is here:

    class Pair0:
        """A poor implementation of the Pair0 protocol"""
    
        def __init__(self, host, port, timeout=None):
            self._sock = socket.create_connection(address=(host, port), timeout=timeout)
            # https://github.com/nanomsg/nanomsg/blob/master/rfc/sp-tcp-mapping-01.txt
            # upon making a connection, both ends are required to send this header
            self._sock.send(b'\x00SP\x00\x00\x10\x00\x00')
            print(self._sock.recv(8))
    
        def send(self, data):
            # messages are simply "length + payload".  Length is 64-bit in network byte
            # order.
            packed = struct.pack('!Q', len(data))
            self._sock.sendall(packed + data)
    
        def recv(self):
            size_bytes = self._sock.recv(8)
            (size,) = struct.unpack('!Q', size_bytes)
            received = 0
            parts = []
            while received < size:
                data = self._sock.recv(size - received)
                received += len(data)
                parts.append(data)
            return b''.join(parts)
    

    And integrated into your test program:

    import socket
    import struct
    import pynng
    import threading
    import time
    
    HOST = "127.0.0.1"
    PORT = 65432
    
    
    def main():
        srv = threading.Thread(target=server)
    
        srv.start()
        # sleep to give the server time to bind to the address
        time.sleep(0.1)
        _client = Pair0(HOST, PORT, 1)
        _client.send(b'hello pynng')
        _client.send(b'hope everything is going well for you')
        print(_client.recv())
        print(_client.recv())
        srv.join()
    
    
    def server():
        with pynng.Pair0(listen=f'tcp://{HOST:s}:{PORT:d}', recv_timeout=1000) as s:
            print("Server running")
            for _ in range(2):
                data = s.recv()
                print("Message received")
                print(data)
            s.send(b'hello bad client')
            s.send(b'I hope you are doing okay')
    
    
    class Pair0:
        """A poor implementation of the Pair0 protocol"""
    
        def __init__(self, host, port, timeout=None):
            self._sock = socket.create_connection(address=(host, port), timeout=timeout)
            # https://github.com/nanomsg/nanomsg/blob/master/rfc/sp-tcp-mapping-01.txt
            # upon making a connection, both ends are required to send this header
            self._sock.send(b'\x00SP\x00\x00\x10\x00\x00')
            print(self._sock.recv(8))
    
        def send(self, data):
            # messages are simply "length + payload".  Length is 64-bit in network byte
            # order.
            packed = struct.pack('!Q', len(data))
            self._sock.sendall(packed + data)
    
        def recv(self):
            size_bytes = self._sock.recv(8)
            (size,) = struct.unpack('!Q', size_bytes)
            received = 0
            parts = []
            while received < size:
                data = self._sock.recv(size - received)
                received += len(data)
                parts.append(data)
            return b''.join(parts)
    
    
    if __name__ == "__main__":
        main()
    

    Now, this is nowhere near as robust as the implementation in pynng (which relies on the underlying nng implementation). nng does The Right Thing™ in edge conditions, including losing network, handling multiple clients, keeping track of state machines, handling SIGINT, etc. This is also an imcomplete implementation, as it does not bind, etc.

    Disclaimer: I am the author of pynng.