pythonpython-multiprocessingshared-memorypython-3.9memory-corruption

Python's multiprocessing SharedMemory ending in memory corruption


I'm trying to pass a SharedMemory reference to an already running process using a queue. The problem is that once I receive (or get) the SharedMemory object on another process, the corresponding memory block does not seem to match at all, even the size is too big.

import numpy as np
import multiprocessing as mp
from multiprocessing.shared_memory import SharedMemory


def f(q):
    shared_memory = q.get()
    print(f"In Process: {shared_memory=}")
    x = np.frombuffer(buffer=shared_memory.buf, dtype=np.float64)
    print(f"In Process: {x=}")


if __name__ == '__main__':
    temp_array = np.arange(8)
    print(f"Main: {temp_array=}")
    smh = SharedMemory(create=True, size=temp_array.nbytes)
    print(f"Main: {smh=}")
    fix_array = np.frombuffer(buffer=smh.buf, dtype=temp_array.dtype)
    fix_array[:] = temp_array[:]
    print(f"Main: {fix_array=}")

    queue = mp.Queue()
    proc = mp.Process(target=f, args=(queue,))
    proc.start()

    queue.put(smh)

If I run this code it spits out following output:

Main: temp_array=array([0, 1, 2, 3, 4, 5, 6, 7])
Main: smh=SharedMemory('wnsm_2202c81b', size=32)
Main: fix_array=array([0, 0, 0, 0, 0, 0, 0, 0])
In Process: shared_memory=SharedMemory('wnsm_2202c81b', size=4096)
In Process: x=array([0., (weird very small numbers and many many zeros...), 0.])

I expected to get the original temp_array=array([0, 1, 2, 3, 4, 5, 6, 7]) back🤔

According to the docs, it is possible that the memory size does not match. Furthermore, I tested it with an array with 1e6 items, passing only the name of the SharedMemory and using a Pipe instead a Queue but still the same.

Am I doing something wrong or is this a bug?

(I'm on Windows 10 Build 19043, Python 3.9.6 64bit)


Solution

  • Thanks to @Timus

    I think it is best if it is split into two problems to tackle:

    Problem 1, Weird numbers:

    If you adjust the definition of f by x = np.frombuffer(buffer=shared_memory.buf, dtype=np.int32) you'll get your numbers back (that was the initial type).

    The error was , as @Timus pointed out, a mismatch of data types: np.arange() returns a np.ndarray with dtype=np.int32 but I was trying to get an array with dtype=np.float64, hence the faulty result.

    Fix:

    @Timus' solution or to add dtype=np.float64 as a parameter of np.arange() so that it reads: temp_array = np.arange(8, dtype=np.float)


    Problem 2, Too long arrays:

    According to the Python Docs, SharedMemory.size may be larger than originally. Therefore, the array's length may vary as well.

    Fix / Workaround:

    Trim the array to its original size, e.g. by using numpy.resize(). For that the original shape needs to be passed to f() as well. Although it is fine for me, following point might be a problem for others: Since x s only a view of the buffer, np.ndarray.resize() is unusable (it does not own its data). Using numpy.resize(), a copy will be made and changes made to the resized copy wont be reflected in the main process! To accommodate for that the values of x_resized could be copied back to x.


    The fixed code now looks like this:

    import multiprocessing as mp
    from multiprocessing.shared_memory import SharedMemory
    
    import numpy as np
    
    def f(q):
        shared_memory, shape = q.get()  # the shape is passed here
        x = np.frombuffer(buffer=shared_memory.buf, dtype=np.float64)  # dtype matches
        # x = np.trim_zeros(x, "b"), this doesn't work if there are zeros in the dataset
        x_resized = np.resize(x, new_shape=shape)  # changes not reflected on main process
    
        ###
        # make things to x_resized
        ###
    
        x[:8] = x_resized[:] # copy changes back to x
    
    if __name__ == '__main__':
        temp_array = np.arange(8, dtype=np.float64) # dtype is correctly specified
        
        smh = SharedMemory(create=True, size=temp_array.nbytes)
        fix_array = np.frombuffer(buffer=smh.buf, dtype=temp_array.dtype)
        fix_array[:] = temp_array[:]
    
        queue = mp.Queue()
        proc = mp.Process(target=f, args=(queue,))
        proc.start()
    
        queue.put((smh, temp_array.shape)) # passing the original shape
    

    The weird thing is that while x in the second process is too long, back in the main process fix_array still keeps the correct size...