I have the following code:
import multiprocessing
manager = multiprocessing.Manager()
Function that appends list if it's length is less than 4 or creates a new one with initial value 'y'.
def f(my_array):
if len(my_array) < 4:
my_array.append('x')
else:
my_array = ['y']
print(my_array)
Initialization of list and creating processes.
if __name__ == '__main__':
my_array = manager.list(['a', 'b', 'c'])
p1 = Process(target=f, args=(my_array))
p2 = Process(target=f, args=(my_array))
p3 = Process(target=f, args=(my_array))
p4 = Process(target=f, args=(my_array))
p5 = Process(target=f, args=(my_array))
p1.start()
p2.start()
p3.start()
p4.start()
p5.start()
p1.join()
p2.join()
p3.join()
p4.join()
p5.join()
Output I got:
['a', 'b', 'c', 'x']
['y']
['y']
['y']
['y']
I don't understand why the list is appended only once. I expected that in the third output line I will observe list ['y'] appended by 'x', so ['y', 'x'], the fourth one ['y', 'x', 'x'] and so on. It seems like shared memory is leaky or does not permit to make a change by functions from multiple processes. What can I do to enable the targeted behavior?
Synchronization is one point missing in your example. manager.list
is just a normal list
in a separate server-process which your worker-processes can modify through proxy-objects. Your further processes happen to check len(my_array)
at the same time.
There is no synchronization, which tells them they should wait until another process has finished it's operation consisting of doing this length-check and performing an action dependent on the result of this check. Your updating operation is not an atomic operation, you need to make it one by using a manager.lock around your operation.
There's another problem in your code, where you re-bind my_array
to point to a normal list ['y']
, instead of updating / modifying the shared manager.list
. You are not modifying the manager.list
with processes which set my_array = ['y']
, the manager.list
keeps it's value ['a', 'b', 'c', 'x']
from the first modification through the first worker-process till the end of your program.
from multiprocessing import Process, Manager
def f(my_array, lock):
with lock:
if len(my_array) < 4:
my_array.append('x')
else:
my_array[:] = [] # clear list inplace by assigning
# empty list to slice of manager.list
my_array.append('y')
print(my_array)
if __name__ == '__main__':
N_WORKERS = 5
with Manager() as manager:
my_array = manager.list(['a', 'b', 'c'])
lock = manager.Lock()
pool = [
Process(target=f, args=(my_array, lock)) for _ in range(N_WORKERS)
]
for p in pool:
p.start()
for p in pool:
p.join()
# Leaving the context-manager block will shut down the manager-process.
# We need to convert the manager-list to a normal list in the parent
# to keep its values available for further processing in the parent.
result = list(my_array)
print(f'result: {result}')
Example Output:
['a', 'b', 'c', 'x']
['y']
['y', 'x']
['y', 'x', 'x']
['y', 'x', 'x', 'x']
result: ['y', 'x', 'x', 'x']
Process finished with exit code 0