pythonmultiprocessingsynchronizationmultiprocessing-manager

How do read and writes work with a manager in Python?


Sorry if this is a stupid question, but I'm having trouble understanding how managers work in python.

Let's say I have a manager that contains a dictionary to be shared across all processes. I want to have just one process writing to the dictionary at a time, while many others read from the dictionary.

  1. Can this happen concurrently, with no synchronization primitives or will something break if read/writes happen at the same time?
  2. What if I want to have multiple processes writing to the dictionary at once - is that allowed or will it break (I know it could cause race conditions, but could it error out)?
  3. Additionally, does a manager process each read and write transaction in a queue like fashion, one at a time, or does it do them all at once?

https://docs.python.org/3/library/multiprocessing.html#sharing-state-between-processes


Solution

  • It depends on how you write to the dictionary, i.e. whether the operation is atomic or not:

    my_dict[some_key] = 9 # this is atomic
    my_dict[some_key] += 1 # this is not atomic
    

    So creating a new key and updating a an existing key as in the first line of code above are atomic operations. But the second line of code are really multiple operations equivalent to:

    temp = my_dict[some_key]
    temp = temp + 1
    my_dict[some_key] = temp
    

    So if two processes were executing my_dict[some_key] += 1 in parallel, they could be reading the same value of temp = my_dict[some_key] and incrementing temp to the same new value and the net effect would be that the dictionary value only gets incremented once. This can be demonstrated as follows:

    from multiprocessing import Pool, Manager, Lock
    
    def init_pool(the_lock):
        global lock
        lock = the_lock
    
    def worker1(d):
        for _ in range(1000):
            with lock:
                d['x'] += 1
    
    def worker2(d):
        for _ in range(1000):
            d['y'] += 1
    
    if __name__ == '__main__':
        lock = Lock()
        with Manager() as manager, \
        Pool(4, initializer=init_pool, initargs=(lock,)) as pool:
            d = manager.dict()
            d['x'] = 0
            d['y'] = 0
            # worker1 will serialize with a lock
            pool.apply_async(worker1, args=(d,))
            pool.apply_async(worker1, args=(d,))
            # worker2 will not serialize with a lock:
            pool.apply_async(worker2, args=(d,))
            pool.apply_async(worker2, args=(d,))
            # wait for the 4 tasks to complete:
            pool.close()
            pool.join()
            print(d)
    

    Prints:

    {'x': 2000, 'y': 1162}
    

    Update

    As far as serialization, goes:

    The BaseManager creates a server using by default a socket for Linux and a named pipe for Windows. So essentially every method you execute against a managed dictionary, for example, is pretty much like a remote method call implemented with message passing. This also means that the server could also be running on a different computer altogether. But, these method calls are not serialized; the object methods themselves must be thread-safe because each method call is run in a new thread.

    The following is an example of creating our own managed type and having the server listening for requests possibly from a different computer (although in this example, the client is running on the same computer). The client is calling increment on the managed object 1000 times across two threads, but the method implementation is not done under a lock and so the resulting value of self.x when we are all done is not 1000. Also, when we retrieve the value of x twice concurrently by method get_x we see that both invocations start up more-or-less at the same time:

    from multiprocessing.managers import BaseManager
    from multiprocessing.pool import ThreadPool
    from threading import Event, Thread, get_ident
    import time
    
    class MathManager(BaseManager):
        pass
    
    class MathClass:
        def __init__(self, x=0):
            self.x = x
    
        def increment(self, y):
            temp = self.x
            time.sleep(.01)
            self.x = temp + 1
    
        def get_x(self):
            print(f'get_x started by thread {get_ident()}', time.time())
            time.sleep(2)
            return self.x
    
        def set_x(self, value):
            self.x = value
    
    def server(event1, event2):
        MathManager.register('Math', MathClass)
        manager = MathManager(address=('localhost', 5000), authkey=b'abracadabra')
        manager.start()
        event1.set() # show we are started
        print('Math server running; waiting for shutdown...')
        event2.wait() # wait for shutdown
        print("Math server shutting down.")
        manager.shutdown()
    
    def client():
        MathManager.register('Math')
        manager = MathManager(address=('localhost', 5000), authkey=b'abracadabra')
        manager.connect()
        math = manager.Math()
        pool = ThreadPool(2)
        pool.map(math.increment, [1] * 1000)
        results = [pool.apply_async(math.get_x) for _ in range(2)]
        for result in results:
            print(result.get())
    
    def main():
        event1 = Event()
        event2 = Event()
        t = Thread(target=server, args=(event1, event2))
        t.start()
        event1.wait() # server started
        client() # now we can run client
        event2.set()
        t.join()
    
    # Required for Windows:
    if __name__ == '__main__':
        main()
    

    Prints:

    Math server running; waiting for shutdown...
    get_x started by thread 43052 1629375415.2502146
    get_x started by thread 71260 1629375415.2502146
    502
    502
    Math server shutting down.