I have a lot of data that I first load in RAM with SharedMemory
and then I read with many child-processes with multiprocessing.Pool.map
.
This is a simplified version (not really example complaint) that I am using:
def SharedObject: # wrapper of SharedMemory passed to subprocesses
def __init__(self, blablabla):
self.shmem = SharedMemory(create=True, size=numbers) # reference to shmem will live as long as the instance of the class will (shmem not getting GC)
temp_arr = np.ndarray(self.shape, dtype=self.dtype, buffer=self.shmem.buf)
temp_arr[:] = ...lot of data... # this array will get destroyed after __init__ finishes
def __getitem__(self, indices) -> np.ndarray: # used by subprocesses
selected = np.ndarray(self.shape, dtype=self.dtype, buffer=self.shmem.buf)
return selected.__getitem__(indices)
# This is in main process
shobj = SharedObject()
with multiprocessing.Pool() as pool:
result= list(pool.map(f, shobj)) # f does shobj.__getitem__
I have 64GBs of RAM in my pc. If I run the above algorithm with little data everything runs smoothly but if I load a lot (~40GBs) then I get the following error:
n __enter__
return self._semlock.__enter__()
^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\mnandcharvim\AppData\Local\Programs\Python\Python312\Lib\multiprocessing\connection.py", line 321, in _recv_bytes
waitres = _winapi.WaitForMultipleObjects(
(basically a lock misuse)
The data is only read so it would be better for me if I could load it in a read-only portion of memory so to not have locks. This question points out that SharedMemory is lock-free but at this point, based on the error that I get I am not sure).
I also tried to conform the code more to the example of the official documentation that I linked:
shmems = [] # module variable (same module of SharedObject) as suggested in some answers
def SharedObject:
def __init__(self, blablabla):
shmem = SharedMemory(name=self.name, create=True, size=numbers) # this reference destroyed after __init__
shmems.append(shmem) # but this one outlives it
temp_arr = np.ndarray(self.shape, dtype=self.dtype, buffer=self.shmem.buf)
temp_arr[:] = ...lot of data...
def __getitem__(self, indices) -> np.ndarray: # used by subprocesses
shmem = SharedMemory(name=self.name) # added line
selected = np.ndarray(self.shape, dtype=self.dtype, buffer=shmem.buf)
return selected.__getitem__(indices)
# This is in main process
shobj = SharedObject()
with multiprocessing.Pool() as pool:
result= list(pool.map(f, shobj)) # f does shobj.__getitem__
This second version errors me out at shmem = SharedMemory(name=self.name) # added line
saying that I do not have enough system resources to do mmap
(weird thing as the data has already been mapped in RAM: the resources were enough for the supposedly first and one time only loading of it).
To reproduce properly adjust arguments based on your system resources as to not overflow RAM. The example goes on deadlock in one of the main for loop iterations. Pressing CTRL+C
shows the error.
I am using Python 3.12.2 64 bit on Windows 10.
script.py:
from multiprocessing.shared_memory import SharedMemory
import numpy as np
import multiprocessing
class SharedObject:
def __init__(self):
self.shape = (2000000, 2049)
self.shmem = SharedMemory(create=True, size=2000000*8196)
self.dtype = np.float32
_temp_arr=np.ndarray(self.shape, dtype=self.dtype, buffer=self.shmem.buf)
_temp_arr[:] = [np.zeros(shape=2049, dtype=self.dtype) for j in range(2000000)]
def f(data: SharedObject) -> int:
print("hello")
return 5
if __name__ == "__main__":
shobj = SharedObject()
for j in range(20):
print(j)
with multiprocessing.Pool() as pool:
list(pool.map(f, [shobj]*256)) # 256 calls to f
The code in the question is flawed.
Using a numpy array in shared memory in conjunction with multiprocessing is probably best explained by example.
In this example we create a one-dimensional numpy array of integers in shared memory. Each time the sub-process (process()) is called we add 1 to each element of the array.
Note the essential use of a lock to protect the shared memory while it's being modified.
process() is invoked NPROCS times. Therefore each element of the array should be equal to NPROCS once all processing has completed. We assert that this is True.
from multiprocessing.shared_memory import SharedMemory
import numpy as np
from multiprocessing import Pool, Lock
SHM = "my_shared_memory_segment"
N = 1_000
DTYPE = int
NPROCS = 10_000
def process():
global lock
try:
shm = SharedMemory(name=SHM)
with lock:
data = np.ndarray((N,), dtype=DTYPE, buffer=shm.buf)
data[:] += 1
finally:
shm.close()
def init_lock(lock_):
global lock
lock = lock_
def main():
try:
lock = Lock()
size = N * np.dtype(DTYPE).itemsize
shm = SharedMemory(name=SHM, create=True, size=size)
data = np.ndarray((N,), dtype=DTYPE, buffer=shm.buf)
data.fill(0)
pool = Pool(initializer=init_lock, initargs=(lock,))
try:
for _ in range(NPROCS):
pool.apply_async(process)
finally:
pool.close()
pool.join()
assert np.all(data == NPROCS)
finally:
shm.close()
shm.unlink()
if __name__ == "__main__":
main()
You should be able to adapt this to suit your specific needs.
EDIT:
Apparently, there's a need for a rather large 2-dimensional ndarray as seen in code posted elsewhere.
Here is a significant edit to my original answer that deals with that requirement.
The code below runs without error on macOS 15.0.1 with Python 3.13.0
import numpy as np
from multiprocessing import Pool, Lock
from multiprocessing.shared_memory import SharedMemory
SHM = "my_shared_memory_segment"
N = 2_000_000
C = 2_048
DTYPE = np.float32
NPROCS = 256
def process(_):
global lock
try:
shm = SharedMemory(name=SHM)
with lock:
data = get_array(shm)
data[0][:] += 1
finally:
shm.close()
def init_lock(lock_):
global lock
lock = lock_
def get_array(shm):
return np.ndarray((N, C), DTYPE, shm.buf)
def main():
try:
lock = Lock()
size = N * C * np.dtype(DTYPE).itemsize
shm = SharedMemory(name=SHM, create=True, size=size)
data = get_array(shm)
data.fill(0.0)
with Pool(initializer=init_lock, initargs=(lock,)) as pool:
pool.map(process, range(NPROCS))
finally:
shm.close()
shm.unlink()
if __name__ == "__main__":
main()