pythonmultiprocessingshared-memorypython-multiprocessingmultiprocessing-manager

Synchronization of writing to shared memory (list) in Python multiprocessing


I have the following code:

import multiprocessing
manager = multiprocessing.Manager()

Function that appends list if it's length is less than 4 or creates a new one with initial value 'y'.

def f(my_array):
    if len(my_array) < 4:
        my_array.append('x')
    else:
        my_array = ['y']
    print(my_array)

Initialization of list and creating processes.

if __name__ == '__main__':
    my_array = manager.list(['a', 'b', 'c'])

    p1 = Process(target=f, args=(my_array))
    p2 = Process(target=f, args=(my_array))
    p3 = Process(target=f, args=(my_array))
    p4 = Process(target=f, args=(my_array))
    p5 = Process(target=f, args=(my_array))

    p1.start()
    p2.start()
    p3.start()
    p4.start()
    p5.start()

    p1.join()
    p2.join()
    p3.join()
    p4.join()
    p5.join()

Output I got:

['a', 'b', 'c', 'x']
['y']
['y']
['y'] 
['y']

I don't understand why the list is appended only once. I expected that in the third output line I will observe list ['y'] appended by 'x', so ['y', 'x'], the fourth one ['y', 'x', 'x'] and so on. It seems like shared memory is leaky or does not permit to make a change by functions from multiple processes. What can I do to enable the targeted behavior?


Solution

  • Synchronization is one point missing in your example. manager.list is just a normal list in a separate server-process which your worker-processes can modify through proxy-objects. Your further processes happen to check len(my_array) at the same time.

    There is no synchronization, which tells them they should wait until another process has finished it's operation consisting of doing this length-check and performing an action dependent on the result of this check. Your updating operation is not an atomic operation, you need to make it one by using a manager.lock around your operation.

    There's another problem in your code, where you re-bind my_array to point to a normal list ['y'], instead of updating / modifying the shared manager.list. You are not modifying the manager.list with processes which set my_array = ['y'], the manager.list keeps it's value ['a', 'b', 'c', 'x'] from the first modification through the first worker-process till the end of your program.

    from multiprocessing import Process, Manager
    
    
    def f(my_array, lock):
        with lock:
            if len(my_array) < 4:
                my_array.append('x')
            else:
                my_array[:] = []  # clear list inplace by assigning
                # empty list to slice of manager.list
                my_array.append('y')
        print(my_array)
    
    
    if __name__ == '__main__':
    
        N_WORKERS = 5
    
        with Manager() as manager:
    
            my_array = manager.list(['a', 'b', 'c'])
            lock = manager.Lock()
    
            pool = [
                Process(target=f, args=(my_array, lock)) for _ in range(N_WORKERS)
            ]
    
            for p in pool:
                p.start()
            for p in pool:
                p.join()
    
            # Leaving the context-manager block will shut down the manager-process.
            # We need to convert the manager-list to a normal list in the parent
            # to keep its values available for further processing in the parent.
            result = list(my_array)
    
        print(f'result: {result}')
    

    Example Output:

    ['a', 'b', 'c', 'x']
    ['y']
    ['y', 'x']
    ['y', 'x', 'x']
    ['y', 'x', 'x', 'x']
    result: ['y', 'x', 'x', 'x']
    
    Process finished with exit code 0