pythonmultiprocessingpython-multiprocessingshared-memory

conflict when using multiprocessing's share memory


I am using multiprocessing's shared memory to share a numpy array between tasks. While each task should originally just read the array, I was curious if writing was also possible. I wrote the following example to test it in a similar situation as I actually use it. In this toy example, each process "opens" the array, adds 1 to the first element (which is initialized as 0.0) and returns it. The returned values of the first index should therefore be [1,2,3,...], which is mostly the case, but if I run it a few times, every now and then, I get an issue where two values are the same.

Is there a way to avoid these conflicts? I know that in this example it would make no sense (or not cause any speedup if other processes need to wait), but I found no way to control the access, so any pointers would be appreciated to fix the actually different problem.

import numpy as np
from multiprocessing import shared_memory, Pool
from itertools import repeat
import time


def test_shm(N=500, n_proc=8, name='example'):
    # create a shared memory array
    a = np.random.rand(N, N, N).astype(np.float64)
    a[0, 0, 0] = 0.0

    shm = shared_memory.SharedMemory(
        name=name, create=True, size=a.nbytes)
    b = np.ndarray(a.shape, dtype=a.dtype, buffer=shm.buf)
    b[:] = a[:]
    shm.close()

    with Pool(n_proc) as p:
        res = p.starmap(
            work, zip(range(n_proc), repeat(name), repeat(a.dtype), repeat(N)))

    for r in res:
        print(f'{r[0]}\t{r[1]}')

    res = np.array([r[0] for r in res])
    print('not ' * int(~np.all(np.sort(res) == 1 + np.arange(n_proc))) + 'all good')

    shm.unlink()


def work(i, name, dtype, N=500):
    shm = shared_memory.SharedMemory(name=name)
    arr = np.ndarray((N, N, N), dtype=dtype, buffer=shm.buf)
    # now do some work
    time.sleep(2)
    val = arr[0, 0, 0:2].copy()
    val[0] += 1.0
    arr[0, 0, 0] = val[0]
    shm.close()

    return val


if __name__ == '__main__':
    test_shm()

Solution

  • you have a data race on this array, you are doing concurrent reads and writes without synchronization, this is a bug even in multithreaded code with lists, it is not limited to shared memory, in this case you need a multiprocessing.Lock to protect reads and writes

    On windows this is slightly complicated as locks need to be passed in the initializer.

    # global scope
    lock_var = None
    def set_lock(value):
        global lock_var
        lock_var = value
    
    ...
    with Pool(n_proc, initializer=set_lock, initargs=(lock,)) as p:
    ...
    
    def work(i, name, dtype, N=500):
        shm = shared_memory.SharedMemory(name=name)
        arr = np.ndarray((N, N, N), dtype=dtype, buffer=shm.buf)
        # now do some work
        time.sleep(2)
        with lock_var:
            val = arr[0, 0, 0:2].copy()
            val[0] += 1.0
            arr[0, 0, 0] = val[0]
        shm.close()
    
        return val