I am trying to make use of Manager() to share dictionary between processes and tried out the following code:
from multiprocessing import Manager, Pool
def f(d):
d['x'] += 2
if __name__ == '__main__':
manager = Manager()
d = manager.dict()
d['x'] = 2
p= Pool(4)
for _ in range(2000):
p.map_async(f, (d,)) #apply_async, map
p.close()
p.join()
print (d) # expects this result --> {'x': 4002}
Using map_async and apply_async, the result printed is always different (e.g. {'x': 3838}, {'x': 3770}). However, using map will give the expected result. Also, i have tried using Process instead of Pool, the results are different too.
Any insights? Something on the non-blocking part and race conditions are not handled by manager?
When you call map
(rather than map_async
), it will block until the processors have finished all the requests you are passing, which in your case is just one call to function f
. So even though you have a pool size of 4, you are in essence doing the 2000 processes one at a time. To actually parallelize execution, you should have done a single p.map(f, [d]*2000)
instead of the loop.
But when you call map_async
, you do not block and are returned a result object. A call to get
on the result object will block until the process finishes and will return with the result of the function call. So now you are running up to 4 processes at a time. But the update to the dictionary is not serialized across the processors. I have modifed the code to force serialization of of d[x] += 2
by using a multiprocessing lock. You will see that the results are now 4002.
from multiprocessing import Manager, Pool, Lock
def f(d):
lock.acquire()
d['x'] += 2
lock.release()
def init(l):
global lock
lock = l
if __name__ == '__main__':
with Manager() as manager:
d = manager.dict()
d['x'] = 2
lock = Lock()
p = Pool(4, initializer=init, initargs=(lock,)) # Create the multiprocessing lock that is sharable by all the processes
results = [] # if the function returnd a result we wanted
for _ in range(2000):
results.append(p.map_async(f, (d,))) #apply_async, map
"""
for i in range(2000): # if the function returned a result we wanted
results[i].get() # wait for everything to finish
"""
p.close()
p.join()
print(d)