I am using multiprocessing
's shared memory to share a numpy array between tasks. While each task should originally just read the array, I was curious if writing was also possible. I wrote the following example to test it in a similar situation as I actually use it. In this toy example, each process "opens" the array, adds 1 to the first element (which is initialized as 0.0
) and returns it. The returned values of the first index should therefore be [1,2,3,...]
, which is mostly the case, but if I run it a few times, every now and then, I get an issue where two values are the same.
Is there a way to avoid these conflicts? I know that in this example it would make no sense (or not cause any speedup if other processes need to wait), but I found no way to control the access, so any pointers would be appreciated to fix the actually different problem.
import numpy as np
from multiprocessing import shared_memory, Pool
from itertools import repeat
import time
def test_shm(N=500, n_proc=8, name='example'):
# create a shared memory array
a = np.random.rand(N, N, N).astype(np.float64)
a[0, 0, 0] = 0.0
shm = shared_memory.SharedMemory(
name=name, create=True, size=a.nbytes)
b = np.ndarray(a.shape, dtype=a.dtype, buffer=shm.buf)
b[:] = a[:]
shm.close()
with Pool(n_proc) as p:
res = p.starmap(
work, zip(range(n_proc), repeat(name), repeat(a.dtype), repeat(N)))
for r in res:
print(f'{r[0]}\t{r[1]}')
res = np.array([r[0] for r in res])
print('not ' * int(~np.all(np.sort(res) == 1 + np.arange(n_proc))) + 'all good')
shm.unlink()
def work(i, name, dtype, N=500):
shm = shared_memory.SharedMemory(name=name)
arr = np.ndarray((N, N, N), dtype=dtype, buffer=shm.buf)
# now do some work
time.sleep(2)
val = arr[0, 0, 0:2].copy()
val[0] += 1.0
arr[0, 0, 0] = val[0]
shm.close()
return val
if __name__ == '__main__':
test_shm()
you have a data race on this array, you are doing concurrent reads and writes without synchronization, this is a bug even in multithreaded code with lists, it is not limited to shared memory, in this case you need a multiprocessing.Lock to protect reads and writes
On windows this is slightly complicated as locks need to be passed in the initializer.
# global scope
lock_var = None
def set_lock(value):
global lock_var
lock_var = value
...
with Pool(n_proc, initializer=set_lock, initargs=(lock,)) as p:
...
def work(i, name, dtype, N=500):
shm = shared_memory.SharedMemory(name=name)
arr = np.ndarray((N, N, N), dtype=dtype, buffer=shm.buf)
# now do some work
time.sleep(2)
with lock_var:
val = arr[0, 0, 0:2].copy()
val[0] += 1.0
arr[0, 0, 0] = val[0]
shm.close()
return val