pythonpython-3.xmultiprocessingmultiprocessing-manager

multiprocessing.pool with manager and async methods


I am trying to make use of Manager() to share dictionary between processes and tried out the following code:

from multiprocessing import Manager, Pool

def f(d):
    d['x'] += 2

if __name__ == '__main__':
    manager = Manager()
    d = manager.dict()
    d['x'] = 2
    p= Pool(4)

    for _ in range(2000):
        p.map_async(f, (d,))  #apply_async, map

    p.close()
    p.join()

    print (d)  # expects this result --> {'x': 4002}

Using map_async and apply_async, the result printed is always different (e.g. {'x': 3838}, {'x': 3770}). However, using map will give the expected result. Also, i have tried using Process instead of Pool, the results are different too.

Any insights? Something on the non-blocking part and race conditions are not handled by manager?


Solution

  • When you call map (rather than map_async), it will block until the processors have finished all the requests you are passing, which in your case is just one call to function f. So even though you have a pool size of 4, you are in essence doing the 2000 processes one at a time. To actually parallelize execution, you should have done a single p.map(f, [d]*2000) instead of the loop.

    But when you call map_async, you do not block and are returned a result object. A call to get on the result object will block until the process finishes and will return with the result of the function call. So now you are running up to 4 processes at a time. But the update to the dictionary is not serialized across the processors. I have modifed the code to force serialization of of d[x] += 2 by using a multiprocessing lock. You will see that the results are now 4002.

    from multiprocessing import Manager, Pool, Lock
    
    
    def f(d):
        lock.acquire()
        d['x'] += 2
        lock.release()
    
    def init(l):
        global lock
        lock = l
    
    if __name__ == '__main__':
        with Manager() as manager:
            d = manager.dict()
            d['x'] = 2
            lock = Lock()
            p = Pool(4, initializer=init, initargs=(lock,)) # Create the multiprocessing lock that is sharable by all the processes
    
            results = [] # if the function returnd a result we wanted
            for _ in range(2000):
                results.append(p.map_async(f, (d,)))  #apply_async, map
            """
            for i in range(2000): # if the function returned a result we wanted
                results[i].get() # wait for everything to finish
            """
            p.close()
            p.join()
            print(d)