pythonpython-multiprocessingpython-3.7python-sockets

How do I add a timeout to multiprocessing.connection.Client(..) in Python 3.7?


I've got two Python programs running. Program A connects to program B with the multiprocessing module:

# Connection code in program A
# -----------------------------
import multiprocessing
import multiprocessing.connection

...

connection = multiprocessing.connection.Client(
('localhost', 19191),                # <- address of program B
authkey='embeetle'.encode('utf-8')   # <- authorization key
)

...

connection.send(send_data)

recv_data = connection.recv()

It works perfectly most of the time. However, sometimes program B is frozen (the details don't matter much, but it usually happens when the GUI from program B spawns a modal window).
While program B is frozen, program A hangs at the following line:

connection = multiprocessing.connection.Client(
('localhost', 19191),                # <- address of program B
authkey='embeetle'.encode('utf-8')   # <- authorization key
)

It keeps waiting for a response. I would like to put a timeout parameter, but the call to multiprocessing.connection.Client(..) does not have one.

How can I implement a timeout here?

 
Notes:
I'm working on a Windows 10 computer with Python 3.7.


Solution

  • I would like to put a timeout parameter, but the call to multiprocessing.connection.Client(..) does not have one. How can I implement a timeout here?

    Looking at the source to multiprocessing.connection in Python 3.7, the Client() function is a fairly brief wrapper around SocketClient() for your use case, which in turn wraps Connection().

    At first it looked fairly straightforward to write a ClientWithTimeout wrapper that does the same thing, but additionally calls settimeout() on the socket it creates for the connection. However, this does not have the correct effect, because:

    1. Python implements its own socket timeout behaviour by using select() and an underlying non-blocking OS socket; this behaviour is what is configured by settimeout().

    2. Connection operates directly on an OS socket handle, which is returned by calling detach() on the normal Python socket object.

    3. Since Python has set the OS socket handle to the non-blocking mode, recv() calls on it return immediately rather than waiting for the timeout period.

    However, we can still set a receive timeout on the underlying OS socket handle by using the low-level SO_RCVTIMEO socket option.

    Hence the second version of my solution:

    from multiprocessing.connection import Connection, answer_challenge, deliver_challenge
    import socket, struct
    
    def ClientWithTimeout(address, authkey, timeout):
    
        with socket.socket(socket.AF_INET) as s:
            s.setblocking(True)
            s.connect(address)
    
            # We'd like to call s.settimeout(timeout) here, but that won't work.
    
            # Instead, prepare a C "struct timeval" to specify timeout. Note that
            # these field sizes may differ by platform.
            seconds = int(timeout)
            microseconds = int((timeout - seconds) * 1e6)
            timeval = struct.pack("@LL", seconds, microseconds)
    
            # And then set the SO_RCVTIMEO (receive timeout) option with this.
            s.setsockopt(socket.SOL_SOCKET, socket.SO_RCVTIMEO, timeval)
    
            # Now create the connection as normal.
            c = Connection(s.detach())
    
        # The following code will now fail if a socket timeout occurs.
    
        answer_challenge(c, authkey)
        deliver_challenge(c, authkey)
    
        return c
    

    For brevity, I have assumed the parameters are as per your example, i.e.:

    If you need to handle cases where these assumptions don't hold then you will need to copy a little more logic from Client() and SocketClient().

    Although I looked at the multiprocessing.connection source to find out how to do this, my solution does not use any private implementation details. Connection, answer_challenge and deliver_challenge are all public and documented parts of the API. This function should therefore be be safe to use with future versions of multiprocessing.connection.

    Note that SO_RCVTIMEO may not be supported on all platforms, but it is present on at least Windows, Linux and OSX. The format of struct timeval is also platform-specific. I have assumed that the two fields are always of the native unsigned long type. I think this should be correct on common platforms but it is not guaranteed to always be so. Unfortunately Python does not currently provide a platform-independent way to do this.

    Below is a test program which shows this working - it assumes the above code is saved as client_timeout.py.

    from multiprocessing.connection import Client, Listener
    from client_timeout import ClientWithTimeout
    from threading import Thread
    from time import time, sleep
    
    addr = ('localhost', 19191)
    key = 'embeetle'.encode('utf-8')
    
    # Provide a listener which either does or doesn't accept connections.
    class ListenerThread(Thread):
    
        def __init__(self, accept):
            Thread.__init__(self)
            self.accept = accept
    
        def __enter__(self):
            if self.accept:
                print("Starting listener, accepting connections")
            else:
                print("Starting listener, not accepting connections")
            self.active = True 
            self.start()
            sleep(0.1)
    
        def run(self):
            listener = Listener(addr, authkey=key)
            self.active = True
            if self.accept:
                listener.accept()
            while self.active:
                sleep(0.1)
            listener.close()
    
        def __exit__(self, exc_type, exc_val, exc_tb):
            self.active = False
            self.join()
            print("Stopped listener")
            return True
    
    for description, accept, name, function in [
            ("ClientWithTimeout succeeds when the listener accepts connections.",
            True, "ClientWithTimeout", lambda: ClientWithTimeout(addr, timeout=3, authkey=key)),
            ("ClientWithTimeout fails after 3s when listener doesn't accept connections.",
            False, "ClientWithTimeout", lambda: ClientWithTimeout(addr, timeout=3, authkey=key)),
            ("Client succeeds when the listener accepts connections.",
            True, "Client", lambda: Client(addr, authkey=key)),
            ("Client hangs when the listener doesn't accept connections (use ctrl-C to stop).",
            False, "Client", lambda: Client(addr, authkey=key))]:
    
        print("Expected result:", description)
    
        with ListenerThread(accept):
            start_time = time()
            try:
                print("Creating connection using %s... " % name)
                client = function()
                print("Client created:", client)
            except Exception as e:
                print("Failed:", e)
            print("Time elapsed: %f seconds" % (time() - start_time))
    
        print()
    

    Running this on Linux produces the following output:

    Expected result: ClientWithTimeout succeeds when the listener accepts connections.
    Starting listener, accepting connections
    Creating connection using ClientWithTimeout... 
    Client created: <multiprocessing.connection.Connection object at 0x7fad536884e0>
    Time elapsed: 0.003276 seconds
    Stopped listener
    
    Expected result: ClientWithTimeout fails after 3s when listener doesn't accept connections.
    Starting listener, not accepting connections
    Creating connection using ClientWithTimeout... 
    Failed: [Errno 11] Resource temporarily unavailable
    Time elapsed: 3.157268 seconds
    Stopped listener
    
    Expected result: Client succeeds when the listener accepts connections.
    Starting listener, accepting connections
    Creating connection using Client... 
    Client created: <multiprocessing.connection.Connection object at 0x7fad53688c50>
    Time elapsed: 0.001957 seconds
    Stopped listener
    
    Expected result: Client hangs when the listener doesn't accept connections (use ctrl-C to stop).
    Starting listener, not accepting connections
    Creating connection using Client... 
    ^C
    Stopped listener