pythonmultiprocessing

Populating a list instance in Multiprocessing BaseManager class


I'm trying to create a way to pass a dataclass object to several different child processes using the Multiprocessing module. Right now I've tried:

import queue
from multiprocessing import Process, Queue, Manager
from multiprocessing.managers import BaseManager, NamespaceProxy

class TestManager(BaseManager):
    pass

class Test:
    def __init__(self):
        self.a = []
        self.b = []
        self.c = []

class TestProxy(NamespaceProxy):
    # We need to expose the same __dunder__ methods as NamespaceProxy,
    # in addition to the b method.
    _exposed_ = ('__getattribute__', '__setattr__', '__delattr__')


def popManager(queue, manager, i):
    manager.a.append(i)
    manager.b.append(i+1)
    manager.c.append(i**2)
    queue.put(True)


def managerParent():
    processes = []
    queues = []
    TestManager.register("Test", Test, TestProxy)
    with TestManager() as manager:
        t = manager.Test()
        for i in range(10):
            queues.append(Queue())
            processes.append(
                Process(target=popManager, args=(queues[i], t, i))
            )

        [p.start() for p in processes]
        print("started!")
        while True:
            try:
                [q.get(timeout=0.2) for i, q in enumerate(queues)]
                break
            except queue.Empty:
                print(f"Processes still running!")
        
        print(t.a, t.a, t.c)

if __name__ == "__main__":
    managerParent()

which runs just fine, but the list instances are all empty. It outputs

[] [] []

where I'd like it to output something like

[0, 1, 2, ... 9], [1, 2, 3, ..., 10], [0, 1, 4, ..., 81]

EDIT:

I discovered that if I don't use the append method, but modify popManager to be:

def popManager(queue, manager, i):
    manager.a += [i]
    manager.b += [i+1]
    manager.c += [i**2]
    queue.put(True)

then it works just fine and I get the expected result. Can someone please explain why I can't use the append method?


Solution

  • See Managed dict of list not updated in multiprocessing when using += operator.

    There is probably a Stack Overflow post that covers the same issue with managed lists and if I had come upon one readily I would have closed this as a duplicate. In essence you have created with the statement t = manager.Test() a proxy object for a Test instance that resides in the manager's process. When you invoke a method or access an attribute supported by the proxy (whether that proxy is explicitly supplied or autogenerated), then the method and its arguments are marshalled to the manager process and ultimately executed against the actual implementation. But when you execute, for example, manager.a.append(), manager.a fetches the current value of self.a and returns it to the caller. You are then appending to your local copy of that list some value, but the proxy object has no idea you have done that. By the way, the name manager is not a descriptive name for a proxy to a Test instance; t would have been a must better choice since you are using that name in the main thread.

    This is how I would re-code your example using your definition of Test (I have made the code more Pythonic):

    from multiprocessing import Process, Manager
    from multiprocessing.managers import BaseManager, NamespaceProxy
    import time
    
    class TestManager(BaseManager):
        pass
    
    class Test:
        def __init__(self):
            self.a = []
            self.b = []
            self.c = []
    
    def pop_manager(t, i):
        l = t.a
        # Demonstrate race condition by giving up control here:
        time.sleep(.2)
        l.append(i)
        t.a = l  # Force an update of t.a
    
        l = t.b
        l.append(i*2)
        t.b = l
    
        l = t.c
        l.append(i**2)
        t.c = l
    
    
    class TestProxy(NamespaceProxy):
        # We need to expose the same __dunder__ methods as NamespaceProxy,
        # in addition to the b method.
        _exposed_ = ('__getattribute__', '__setattr__', '__delattr__')
    
    def manager_parent():
        TestManager.register("Test", Test, TestProxy)
        with TestManager() as manager:
            t = manager.Test()
            processes = [
                Process(target=pop_manager, args=(t, i))
                for i in range(10)
            ]
            for p in processes:
                p.start()
            for p in processes:
                p.join()
    
            print(t.a, t.b, t.c)
    
    if __name__ == "__main__":
        manager_parent()
    

    Prints:

    [8] [0, 2, 8, 10, 14, 4, 18, 12, 16] [0, 1, 16, 25, 49, 4, 81, 64]
    

    But now you have a race condition because the sequence:

        l = t.a
        l.append(i)
        t.a = l  # Force an update of t.a
    

    ... is not atomic. Two threads can read the same value l = t.a and the update done by one thread will be nullified by the update done by the other thread (the last one to execute t.a = l wins). In the above code I inserted a call to time.sleep() to demonstrate this.

    So I would consider instead creating an append_values method as a cleaner, thread-safe way of doing this:

    from multiprocessing import Process, Manager
    from multiprocessing.managers import BaseManager, NamespaceProxy
    
    class TestManager(BaseManager):
        pass
    
    class Test:
        def __init__(self):
            self.a = []
            self.b = []
            self.c = []
    
        def append_values(self, value1, value2, value3):
            self.a.append(value1)
            self.b.append(value2)
            self.c.append(value3)
    
    class TestProxy(NamespaceProxy):
        # We need to expose the same __dunder__ methods as NamespaceProxy,
        # in addition to the b method.
        _exposed_ = ('__getattribute__', '__setattr__', '__delattr__', 'append_values')
    
    
        def append_values(self, value1, value2, value3):
            return self._callmethod('append_values', args=(value1, value2, value3))
    
    
    def pop_manager(t, i):
        t.append_values(i, i+1, i**2)
    
    
    def manager_parent():
        TestManager.register("Test", Test, TestProxy)
        with TestManager() as manager:
            t = manager.Test()
            processes = [
                Process(target=pop_manager, args=(t, i))
                for i in range(10)
            ]
            for p in processes:
                p.start()
            for p in processes:
                p.join()
    
            print(t.a, t.b, t.c)
    
    if __name__ == "__main__":
        manager_parent()
    

    Prints:

    [0, 1, 4, 3, 5, 9, 6, 2, 8, 7] [1, 2, 5, 4, 6, 10, 7, 3, 9, 8] [0, 1, 16, 9, 25, 81, 36, 4, 64, 49]