pythonnumpymultiprocessingshared-memory

SharedMemory numpy array multiprocessing probelm


The problem

I have a lot of data that I first load in RAM with SharedMemory and then I read with many child-processes with multiprocessing.Pool.map.

The code

This is a simplified version (not really example complaint) that I am using:

def SharedObject: # wrapper of SharedMemory passed to subprocesses
  def __init__(self, blablabla):
    self.shmem = SharedMemory(create=True, size=numbers) # reference to shmem will live as long as the instance of the class will (shmem not getting GC)
    temp_arr = np.ndarray(self.shape, dtype=self.dtype, buffer=self.shmem.buf)
    temp_arr[:] = ...lot of data... # this array will get destroyed after __init__ finishes

  def __getitem__(self, indices) -> np.ndarray: # used by subprocesses
    selected = np.ndarray(self.shape, dtype=self.dtype, buffer=self.shmem.buf)
    return selected.__getitem__(indices)

# This is in main process
shobj = SharedObject()
with multiprocessing.Pool() as pool:
  result= list(pool.map(f, shobj))  # f does shobj.__getitem__

The strange behavior

I have 64GBs of RAM in my pc. If I run the above algorithm with little data everything runs smoothly but if I load a lot (~40GBs) then I get the following error:

n __enter__
return self._semlock.__enter__()
       ^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\mnandcharvim\AppData\Local\Programs\Python\Python312\Lib\multiprocessing\connection.py", line 321, in _recv_bytes
waitres = _winapi.WaitForMultipleObjects(

(basically a lock misuse)

The data is only read so it would be better for me if I could load it in a read-only portion of memory so to not have locks. This question points out that SharedMemory is lock-free but at this point, based on the error that I get I am not sure).

2nd version

I also tried to conform the code more to the example of the official documentation that I linked:

shmems = [] # module variable (same module of SharedObject) as suggested in some answers
def SharedObject:
  def __init__(self, blablabla):
    shmem = SharedMemory(name=self.name, create=True, size=numbers) # this reference destroyed after __init__
    shmems.append(shmem) # but this one outlives it
    temp_arr = np.ndarray(self.shape, dtype=self.dtype, buffer=self.shmem.buf)
    temp_arr[:] = ...lot of data...

  def __getitem__(self, indices) -> np.ndarray: # used by subprocesses
    shmem = SharedMemory(name=self.name) # added line
    selected = np.ndarray(self.shape, dtype=self.dtype, buffer=shmem.buf)
    return selected.__getitem__(indices)

# This is in main process
shobj = SharedObject()
with multiprocessing.Pool() as pool:
  result= list(pool.map(f, shobj))  # f does shobj.__getitem__

2nd strange behavior

This second version errors me out at shmem = SharedMemory(name=self.name) # added line saying that I do not have enough system resources to do mmap (weird thing as the data has already been mapped in RAM: the resources were enough for the supposedly first and one time only loading of it).

Things to keep in mind

Minimal reproducible example

To reproduce properly adjust arguments based on your system resources as to not overflow RAM. The example goes on deadlock in one of the main for loop iterations. Pressing CTRL+C shows the error. I am using Python 3.12.2 64 bit on Windows 10.

script.py:

from multiprocessing.shared_memory import SharedMemory
import numpy as np
import multiprocessing

class SharedObject:
    def __init__(self):
        self.shape = (2000000, 2049)
        self.shmem = SharedMemory(create=True, size=2000000*8196)
        self.dtype = np.float32
        _temp_arr=np.ndarray(self.shape, dtype=self.dtype, buffer=self.shmem.buf)
        _temp_arr[:] = [np.zeros(shape=2049, dtype=self.dtype) for j in range(2000000)]

def f(data: SharedObject) -> int:
    print("hello")
    return 5

if __name__ == "__main__":
    shobj = SharedObject()

    for j in range(20):
        print(j)
        with multiprocessing.Pool() as pool:
            list(pool.map(f, [shobj]*256)) # 256 calls to f

Solution

  • The code in the question is flawed.

    Using a numpy array in shared memory in conjunction with multiprocessing is probably best explained by example.

    In this example we create a one-dimensional numpy array of integers in shared memory. Each time the sub-process (process()) is called we add 1 to each element of the array.

    Note the essential use of a lock to protect the shared memory while it's being modified.

    process() is invoked NPROCS times. Therefore each element of the array should be equal to NPROCS once all processing has completed. We assert that this is True.

    from multiprocessing.shared_memory import SharedMemory
    import numpy as np
    from multiprocessing import Pool, Lock
    
    SHM = "my_shared_memory_segment"
    N = 1_000
    DTYPE = int
    NPROCS = 10_000
    
    def process():
        global lock
        try:
            shm = SharedMemory(name=SHM)
            with lock:
                data = np.ndarray((N,), dtype=DTYPE, buffer=shm.buf)
                data[:] += 1
        finally:
            shm.close()
    
    def init_lock(lock_):
        global lock
        lock = lock_
    
    def main():
        try:
            lock = Lock()
            size = N * np.dtype(DTYPE).itemsize
            shm = SharedMemory(name=SHM, create=True, size=size)
            data = np.ndarray((N,), dtype=DTYPE, buffer=shm.buf)
            data.fill(0)
            pool = Pool(initializer=init_lock, initargs=(lock,))
            try:
                for _ in range(NPROCS):
                    pool.apply_async(process)
            finally:
                pool.close()
                pool.join()
            assert np.all(data == NPROCS)
        finally:
            shm.close()
            shm.unlink()
    
    if __name__ == "__main__":
        main()
    

    You should be able to adapt this to suit your specific needs.

    EDIT:

    Apparently, there's a need for a rather large 2-dimensional ndarray as seen in code posted elsewhere.

    Here is a significant edit to my original answer that deals with that requirement.

    The code below runs without error on macOS 15.0.1 with Python 3.13.0

    import numpy as np
    from multiprocessing import Pool, Lock
    from multiprocessing.shared_memory import SharedMemory
    
    SHM = "my_shared_memory_segment"
    N = 2_000_000
    C = 2_048
    DTYPE = np.float32
    NPROCS = 256
    
    def process(_):
        global lock
        try:
            shm = SharedMemory(name=SHM)
            with lock:
                data = get_array(shm)
                data[0][:] += 1
        finally:
            shm.close()
    
    def init_lock(lock_):
        global lock
        lock = lock_
    
    def get_array(shm):
        return np.ndarray((N, C), DTYPE, shm.buf)
    
    def main():
        try:
            lock = Lock()
            size = N * C * np.dtype(DTYPE).itemsize
            shm = SharedMemory(name=SHM, create=True, size=size)
            data = get_array(shm)
            data.fill(0.0)
            with Pool(initializer=init_lock, initargs=(lock,)) as pool:
                pool.map(process, range(NPROCS))
        finally:
            shm.close()
            shm.unlink()
    
    if __name__ == "__main__":
        main()