pythonmultiprocessingdata-acquisition

Acquire data in one process and analyze those data using two separate processes (in python, multiprocessing)


I aim to acquire data in one process and analyze those data using two separate processes, which will run in parallel.

In the provided minimal example, the initial process generates (every 1 second) data consisting of three arrays: array1, array2, and array3. Subsequently, two additional processes analyze these arrays.

I am seeking confirmation on the correctness of this approach, particularly regarding the analysis of the data. Is it best to analyze the data here :

# This is where I would do some processing on the data

?

from multiprocessing import shared_memory, Process, Lock, Value
import numpy as np
import time


# create a shared memory and write to it (producer)
def producer(n, shape1, shape2, shape3, lock, new_value_flag1, new_value_flag2, iteration):
    for i in range(10):
        with lock:
            existing_shm = shared_memory.SharedMemory(name=n)
            np_array1 = np.ndarray(shape1, dtype=np.float32, buffer=existing_shm.buf[:np.prod(shape1)*4])
            np_array2 = np.ndarray(shape2, dtype=np.float32, buffer=existing_shm.buf[4*np.prod(shape1):4*(np.prod(shape1)+np.prod(shape2))])
            np_array3 = np.ndarray(shape3, dtype=np.float32, buffer=existing_shm.buf[4*(np.prod(shape1)+np.prod(shape2)):])
            np_array1[:] = np.random.randint(0, 1000, np_array1.shape)
            np_array2[:] = np.random.randint(0, 1000, np_array2.shape)
            np_array3[:] = np.random.randint(0, 1000, np_array3.shape)
            existing_shm.close()
            new_value_flag1.value = 1
            new_value_flag2.value = 1
            iteration.value = i
        time.sleep(1)

# read from the shared memory using a different process (consumer 1)
def consumer1(n, shape1, shape2, shape3, lock, new_value_flag1, iteration):
    while True:
        if new_value_flag1.value == 1:
            with lock:
                print('Start consumer1',time.time())
                existing_shm = shared_memory.SharedMemory(name=n)
                np_array1 = np.ndarray(shape1, dtype=np.float32, buffer=existing_shm.buf[:np.prod(shape1)*4]).copy()
                np_array2 = np.ndarray(shape2, dtype=np.float32, buffer=existing_shm.buf[4*np.prod(shape1):4*(np.prod(shape1)+np.prod(shape2))]).copy()
                np_array3 = np.ndarray(shape3, dtype=np.float32, buffer=existing_shm.buf[4*(np.prod(shape1)+np.prod(shape2)):]).copy()
                print(f"consumer 1, Iteration {iteration.value}:")
                existing_shm.close()
                new_value_flag1.value = 0
                print('Stop consumer1',time.time())
            # This is where I would do some processing on the data
            print(np_array1.mean())
            print(np_array2.mean())
            print(np_array3.mean())
        time.sleep(0.01)

def consumer2(n, shape1, shape2, shape3, lock, new_value_flag2, iteration):
    while True:
        if new_value_flag2.value == 1:
            with lock:
                print('Start consumer2',time.time())
                existing_shm = shared_memory.SharedMemory(name=n)
                np_array1 = np.ndarray(shape1, dtype=np.float32, buffer=existing_shm.buf[:np.prod(shape1)*4]).copy()
                np_array2 = np.ndarray(shape2, dtype=np.float32, buffer=existing_shm.buf[4*np.prod(shape1):4*(np.prod(shape1)+np.prod(shape2))]).copy()
                np_array3 = np.ndarray(shape3, dtype=np.float32, buffer=existing_shm.buf[4*(np.prod(shape1)+np.prod(shape2)):]).copy()
                print(f"consumer 2, Iteration {iteration.value}:")
                existing_shm.close()
                new_value_flag2.value = 0
                print('Stop consumer2',time.time())
            # This is where I would do some processing on the data
            print(np_array1.mean())
            print(np_array2.mean())
            print(np_array3.mean())
        time.sleep(0.01)
        
if __name__ == '__main__':
    # assume we have 3 arrays of different sizes (float32)
    shape1 = (2000, 20000)
    shape2 = (2000, 30000)
    shape3 = (2000, 40000)
    total_size = np.prod(shape1)*4 + np.prod(shape2)*4 + np.prod(shape3)*4
    shm = shared_memory.SharedMemory(create=True, size= total_size)
    lock = Lock()
    new_value_flag1 = Value('i', 0)
    new_value_flag2 = Value('i', 0)
    iteration = Value('i', 0)
    
    
    p1 = Process(target=producer, args=(shm.name, shape1, shape2, shape3, lock, new_value_flag1, new_value_flag2, iteration))
    p2 = Process(target=consumer1, args=(shm.name, shape1, shape2, shape3, lock, new_value_flag1, iteration))
    p3 = Process(target=consumer2, args=(shm.name, shape1, shape2, shape3, lock, new_value_flag2, iteration))

    p2.start()
    p3.start()
    time.sleep(2) # delay to make sure the consumer processes are ready
    p1.start()

    p2.join()
    p3.join()
    p1.join()

    # I know I have to crtl-c to stop the program
    #shm.close()
    #shm.unlink()

EDIT Following the first comment, I have a new code :

from multiprocessing import shared_memory, Process, Value
import numpy as np
import time
# create a shared memory and write to it (producer)
def producer(n, shape1, shape2, shape3, new_value_flag1, new_value_flag2, iteration):
    existing_shm = shared_memory.SharedMemory(name=n)
    np_array1 = np.ndarray(shape1, dtype=np.float32, buffer=existing_shm.buf[:np.prod(shape1)*4])
    np_array2 = np.ndarray(shape2, dtype=np.float32, buffer=existing_shm.buf[4*np.prod(shape1):4*(np.prod(shape1)+np.prod(shape2))])
    np_array3 = np.ndarray(shape3, dtype=np.float32, buffer=existing_shm.buf[4*(np.prod(shape1)+np.prod(shape2)):])
    for i in range(100):
        if new_value_flag1.value ==0 and new_value_flag2.value == 0:
            start_time = time.time()
            np_array1[:] = np.random.randint(0, 255, np_array1.shape)
            np_array2[:] = np.random.randint(0, 255, np_array2.shape)
            np_array3[:] = np.random.randint(0, 255, np_array3.shape)
            new_value_flag1.value = 1
            new_value_flag2.value = 1
            iteration.value = i
            print('producer', i, time.time()-start_time)
        time.sleep(0.5)
    existing_shm.close()

# read from the shared memory using a different process (consumer 1)
def consumer1(n, shape1, shape2, shape3, new_value_flag1, iteration):
    existing_shm = shared_memory.SharedMemory(name=n)
    np_array1 = np.ndarray(shape1, dtype=np.float32, buffer=existing_shm.buf[:np.prod(shape1)*4])
    np_array2 = np.ndarray(shape2, dtype=np.float32, buffer=existing_shm.buf[4*np.prod(shape1):4*(np.prod(shape1)+np.prod(shape2))])
    np_array3 = np.ndarray(shape3, dtype=np.float32, buffer=existing_shm.buf[4*(np.prod(shape1)+np.prod(shape2)):])
    while True:
        if new_value_flag1.value == 1:
            # This is where I would do some processing on the data
            print('consumer1', iteration.value, np_array1.mean(), np_array2.mean(), np_array3.mean(), time.time())
            new_value_flag1.value = 0
        time.sleep(0.01)
    existing_shm.close()

# read from the shared memory using a different process (consumer 2)
def consumer2(n, shape1, shape2, shape3, new_value_flag2, iteration):
    existing_shm = shared_memory.SharedMemory(name=n)
    np_array1 = np.ndarray(shape1, dtype=np.float32, buffer=existing_shm.buf[:np.prod(shape1)*4])
    np_array2 = np.ndarray(shape2, dtype=np.float32, buffer=existing_shm.buf[4*np.prod(shape1):4*(np.prod(shape1)+np.prod(shape2))])
    np_array3 = np.ndarray(shape3, dtype=np.float32, buffer=existing_shm.buf[4*(np.prod(shape1)+np.prod(shape2)):])
    while True:
        if new_value_flag2.value == 1:
            # This is where I would do some processing on the data
            print('consumer2', iteration.value, np_array1.mean(), np_array2.mean(), np_array3.mean(), time.time())
            new_value_flag2.value = 0
        time.sleep(0.01)
    existing_shm.close()
    
if __name__ == '__main__':
    # assume we have 3 arrays of different sizes
    shape1 = (200, 200)
    shape2 = (200, 300)
    shape3 = (200, 400)
    total_size = np.prod(shape1)*4 + np.prod(shape2)*4 + np.prod(shape3)*4
    shm = shared_memory.SharedMemory(create=True, size= total_size)
    new_value_flag1 = Value('i', 0)
    new_value_flag2 = Value('i', 0)
    iteration = Value('i', 0)
    
    
    p1 = Process(target=producer, args=(shm.name, shape1, shape2, shape3, new_value_flag1, new_value_flag2, iteration))
    p2 = Process(target=consumer1, args=(shm.name, shape1, shape2, shape3, new_value_flag1, iteration))
    p3 = Process(target=consumer2, args=(shm.name, shape1, shape2, shape3, new_value_flag2, iteration))

    p2.start()
    p3.start()
    time.sleep(2) # delay to make sure the consumer processes are ready
    p1.start()

    p2.join()
    p3.join()
    p1.join()

    # I know I have to crtl-c to stop the program
    #shm.close()
    #shm.unlink()

EDIT 2 Without unnecessary mapping and unmapping + using Event

from multiprocessing import shared_memory, Process, Event, Value
import numpy as np
import time

# create a shared memory and write to it (producer)
def producer(n, shape1, shape2, shape3, new_values_event_1, new_values_event_2, iteration):
    existing_shm = shared_memory.SharedMemory(name=n)
    np_array1 = np.ndarray(shape1, dtype=np.float32, buffer=existing_shm.buf[:np.prod(shape1)*4])
    np_array2 = np.ndarray(shape2, dtype=np.float32, buffer=existing_shm.buf[4*np.prod(shape1):4*(np.prod(shape1)+np.prod(shape2))])
    np_array3 = np.ndarray(shape3, dtype=np.float32, buffer=existing_shm.buf[4*(np.prod(shape1)+np.prod(shape2)):])
    for i in range(100):
        if not new_values_event_1.is_set() and not new_values_event_2.is_set():
            start_time = time.time()
            np_array1[:] = np.random.randint(0, 255, np_array1.shape)
            np_array2[:] = np.random.randint(0, 255, np_array2.shape)
            np_array3[:] = np.random.randint(0, 255, np_array3.shape)
            new_values_event_1.set()
            new_values_event_2.set()
            iteration.value = i
            print('producer', i, time.time()-start_time)
        time.sleep(0.5)
    existing_shm.close()

# read from the shared memory using a different process (consumer 1)
def consumer1(n, shape1, shape2, shape3, new_values_event_1, iteration):
    existing_shm = shared_memory.SharedMemory(name=n)
    np_array1 = np.ndarray(shape1, dtype=np.float32, buffer=existing_shm.buf[:np.prod(shape1)*4])
    np_array2 = np.ndarray(shape2, dtype=np.float32, buffer=existing_shm.buf[4*np.prod(shape1):4*(np.prod(shape1)+np.prod(shape2))])
    np_array3 = np.ndarray(shape3, dtype=np.float32, buffer=existing_shm.buf[4*(np.prod(shape1)+np.prod(shape2)):])
    while True:
        if new_values_event_1.is_set():
            # This is where I would do some processing on the data
            print('consumer1', iteration.value, np_array1.mean(), np_array2.mean(), np_array3.mean(), time.time())
            new_values_event_1.clear()
        time.sleep(0.01)
    existing_shm.close()

# read from the shared memory using a different process (consumer 2)
def consumer2(n, shape1, shape2, shape3, new_values_event_2, iteration):
    existing_shm = shared_memory.SharedMemory(name=n)
    np_array1 = np.ndarray(shape1, dtype=np.float32, buffer=existing_shm.buf[:np.prod(shape1)*4])
    np_array2 = np.ndarray(shape2, dtype=np.float32, buffer=existing_shm.buf[4*np.prod(shape1):4*(np.prod(shape1)+np.prod(shape2))])
    np_array3 = np.ndarray(shape3, dtype=np.float32, buffer=existing_shm.buf[4*(np.prod(shape1)+np.prod(shape2)):])
    while True:
        if new_values_event_2.is_set():
            # This is where I would do some processing on the data
            print('consumer2', iteration.value, np_array1.mean(), np_array2.mean(), np_array3.mean(), time.time())
            new_values_event_2.clear()
        time.sleep(0.01)
    existing_shm.close()

if __name__ == '__main__':
    # assume we have 3 arrays of different sizes
    shape1 = (50, 50)
    shape2 = (50, 50)
    shape3 = (50, 50)
    total_size = np.prod(shape1)*4 + np.prod(shape2)*4 + np.prod(shape3)*4
    shm = shared_memory.SharedMemory(create=True, size= total_size)
    new_values_event_1 = Event()
    new_values_event_2 = Event()
    iteration = Value('i', 0)

    p1 = Process(target=producer, args=(shm.name, shape1, shape2, shape3, new_values_event_1, new_values_event_2, iteration))
    p2 = Process(target=consumer1, args=(shm.name, shape1, shape2, shape3, new_values_event_1, iteration))
    p3 = Process(target=consumer2, args=(shm.name, shape1, shape2, shape3, new_values_event_2, iteration))

    p2.start()
    p3.start()
    time.sleep(2) # delay to make sure the consumer processes are ready
    p1.start()

    p2.join()
    p3.join()
    p1.join()

    # I know I have to crtl-c to stop the program
    #shm.close()
    #shm.unlink()

EDIT 3 Without unnecessary mapping and unmapping + using Event + removing unnecessary polling

from multiprocessing import shared_memory, Process, Event, Value
import numpy as np
import time


# create a shared memory and write to it (producer
def producer(n, shape1, shape2, shape3, event_consumer1, event_consumer2, event_producer, iteration):
    existing_shm = shared_memory.SharedMemory(name=n)
    np_array1 = np.ndarray(shape1, dtype=np.float32, buffer=existing_shm.buf[:np.prod(shape1)*4])
    np_array2 = np.ndarray(shape2, dtype=np.float32, buffer=existing_shm.buf[4*np.prod(shape1):4*(np.prod(shape1)+np.prod(shape2))])
    np_array3 = np.ndarray(shape3, dtype=np.float32, buffer=existing_shm.buf[4*(np.prod(shape1)+np.prod(shape2)):])
    for i in range(100):
            event_consumer1.wait()  # Wait until consumer 1 is ready
            event_consumer1.clear()  # Reset the event
            event_consumer2.wait()  # Wait until consumer 2 is ready
            event_consumer2.clear()  # Reset the event
            start_time = time.time()
            np_array1[:] = np.random.randint(0, 255, np_array1.shape)
            np_array2[:] = np.random.randint(0, 255, np_array2.shape)
            np_array3[:] = np.random.randint(0, 255, np_array3.shape)
            iteration.value = i
            print('producer', i, time.time()-start_time)
            event_producer.set()  # Signal the consumers that new data is available
            time.sleep(2) # delay to simulate the time at which the data is produced
    existing_shm.close()

# read from the shared memory using a different process (consumer 1)
def consumer1(n, shape1, shape2, shape3, event_consumer1, event_producer, iteration):
    existing_shm = shared_memory.SharedMemory(name=n)
    np_array1 = np.ndarray(shape1, dtype=np.float32, buffer=existing_shm.buf[:np.prod(shape1)*4])
    np_array2 = np.ndarray(shape2, dtype=np.float32, buffer=existing_shm.buf[4*np.prod(shape1):4*(np.prod(shape1)+np.prod(shape2))])
    np_array3 = np.ndarray(shape3, dtype=np.float32, buffer=existing_shm.buf[4*(np.prod(shape1)+np.prod(shape2)):])
    while True:
            event_producer.wait()  # Wait until the producer has produced new data
            event_producer.clear()  # Reset the event
            # This is where I would do some processing on the data
            print('consumer1', iteration.value, np_array1.mean(), np_array2.mean(), np_array3.mean(), time.time())
            time.sleep(0.1) # delay to simulate the time at which the data is processed
            event_consumer1.set()  # Signal the producer that the data has been processed
    existing_shm.close()

# read from the shared memory using a different process (consumer 2)
def consumer2(n, shape1, shape2, shape3, event_consumer2, event_producer, iteration):
    existing_shm = shared_memory.SharedMemory(name=n)
    np_array1 = np.ndarray(shape1, dtype=np.float32, buffer=existing_shm.buf[:np.prod(shape1)*4])
    np_array2 = np.ndarray(shape2, dtype=np.float32, buffer=existing_shm.buf[4*np.prod(shape1):4*(np.prod(shape1)+np.prod(shape2))])
    np_array3 = np.ndarray(shape3, dtype=np.float32, buffer=existing_shm.buf[4*(np.prod(shape1)+np.prod(shape2)):])
    while True:
        event_producer.wait()  # Wait until the producer has produced new data
        event_producer.clear()  # Reset the event
        # This is where I would do some processing on the data
        print('consumer2', iteration.value, np_array1.mean(), np_array2.mean(), np_array3.mean(), time.time())
        time.sleep(0.1) # delay to simulate the time at which the data is processed
        event_consumer2.set()  # Signal the producer that the data has been processed)
    existing_shm.close()

if __name__ == '__main__':
    # assume we have 3 arrays of different sizes
    shape1 = (5000, 50)
    shape2 = (5000, 50)
    shape3 = (5000, 50)


    total_size = np.prod(shape1)*4 + np.prod(shape2)*4 + np.prod(shape3)*4
    shm = shared_memory.SharedMemory(create=True, size= total_size)
    event_consumer1 = Event()
    event_consumer2 = Event()
    event_consumer1.set()  # Set the event to allow the producer to start
    event_consumer2.set()  # Set the event to allow the producer to start
    event_producer = Event()
    iteration = Value('i', 0)

    p1 = Process(target=producer, args=(shm.name, shape1, shape2, shape3,  event_consumer1, event_consumer2,event_producer , iteration))
    p2 = Process(target=consumer1, args=(shm.name, shape1, shape2, shape3, event_consumer1, event_producer,  iteration))
    p3 = Process(target=consumer2, args=(shm.name, shape1, shape2, shape3, event_consumer2, event_producer, iteration))

    p2.start()
    p3.start()
    time.sleep(2) # delay to make sure the consumer processes are ready
    p1.start()

    p2.join()
    p3.join()
    p1.join()

    # I know I have to crtl-c to stop the program
    #shm.close()
    #shm.unlink()

Solution

  • Looking at your EDIT 3 code I see a few problems:

    1. On my Windows platform, the creation of shared memory raises an exception because you have total_size = np.prod(shape1)*4 + np.prod(shape2)*4 + np.prod(shape3)*4 and you then pass total_size as the size argument to your call to shared_memory.SharedMemory. But this call requires this argument to be an int rather than an numpy.int32. You should not be using np.prod to multiply together the elements of a tuple. Instead use operator.mul, for example operator.mul(*shape1).
    2. Your program does not terminate because your consumer processes are hanging. You should use an addition shared variable running that the consumers should check to see if the producer is still producing data.
    3. Instead of a single event_producer Event instance that is only used by one of your consumers, you should have one for each consumer.
    4. Your producer function is setting and waiting on events in an illogical order.

    I have added comments in the following code tagged with 'Booboo' so you can search for them. Also, I have the producer creating only 3 iterations so that it terminates more quickly:

    from multiprocessing import shared_memory, Process, Event, Value
    import numpy as np
    import time
    # Used this to compute the size of shared memory - Booboo:
    from operator import mul
    
    
    # create a shared memory and write to it (producer
    def producer(n, shape1, shape2, shape3, event_consumer1, event_consumer2, event_producer1, event_producer2, iteration, running):
        existing_shm = shared_memory.SharedMemory(name=n)
        np_array1 = np.ndarray(shape1, dtype=np.float32, buffer=existing_shm.buf[:mul(*shape1)*4])
        np_array2 = np.ndarray(shape2, dtype=np.float32, buffer=existing_shm.buf[4*mul(*shape1):4*(mul(*shape1)+mul(*shape2))])
        np_array3 = np.ndarray(shape3, dtype=np.float32, buffer=existing_shm.buf[4*(mul(*shape1)+mul(*shape2)):])
        for i in range(3): # Loop just 3 times - Booboo:
            # Note that the order of setting and waiting has been changed - Booboo:
            start_time = time.time()
            time.sleep(2) # delay to simulate the time at which the data is produced
            np_array1[:] = np.random.randint(0, 255, np_array1.shape)
            np_array2[:] = np.random.randint(0, 255, np_array2.shape)
            np_array3[:] = np.random.randint(0, 255, np_array3.shape)
            iteration.value = i
            print('producer', i, time.time()-start_time)
            event_producer1.set()  # Signal the consumers that new data is available
            event_producer2.set()  # Signal the consumers that new data is available
            event_consumer1.wait()  # Wait until consumer 1 is ready
            event_consumer1.clear()  # Reset the event
            event_consumer2.wait()  # Wait until consumer 2 is ready
            event_consumer2.clear()  # Reset the event
    
        # Show we are no longer running - Booboo:
        running.value = 0
        # Wake up the consumers - Booboo:
        event_producer1.set()
        event_producer2.set()
        existing_shm.close()
    
    # read from the shared memory using a different process (consumer 1)
    def consumer1(n, shape1, shape2, shape3, event_consumer1, event_producer, iteration, running):
        existing_shm = shared_memory.SharedMemory(name=n)
        np_array1 = np.ndarray(shape1, dtype=np.float32, buffer=existing_shm.buf[:mul(*shape1)*4])
        np_array2 = np.ndarray(shape2, dtype=np.float32, buffer=existing_shm.buf[4*mul(*shape1):4*(mul(*shape1)+mul(*shape2))])
        np_array3 = np.ndarray(shape3, dtype=np.float32, buffer=existing_shm.buf[4*(mul(*shape1)+mul(*shape2)):])
        while True:
            event_producer.wait()  # Wait until the producer has produced new data or no lonfer running - Booboo
            event_producer.clear()  # Reset the event
            if not running.value:
                break
            # This is where I would do some processing on the data
            print('consumer1', iteration.value, np_array1.mean(), np_array2.mean(), np_array3.mean(), time.time())
            time.sleep(0.1) # delay to simulate the time at which the data is processed
            event_consumer1.set()  # Signal the producer that the data has been processed
        existing_shm.close()
    
    # read from the shared memory using a different process (consumer 2)
    def consumer2(n, shape1, shape2, shape3, event_consumer2, event_producer, iteration, running):
        existing_shm = shared_memory.SharedMemory(name=n)
        np_array1 = np.ndarray(shape1, dtype=np.float32, buffer=existing_shm.buf[:mul(*shape1)*4])
        np_array2 = np.ndarray(shape2, dtype=np.float32, buffer=existing_shm.buf[4*mul(*shape1):4*(mul(*shape1)+mul(*shape2))])
        np_array3 = np.ndarray(shape3, dtype=np.float32, buffer=existing_shm.buf[4*(mul(*shape1)+mul(*shape2)):])
        while True:
            event_producer.wait()  # Wait until the producer has produced new data or no lonfer running - Booboo
            event_producer.clear()  # Reset the event
            if not running.value:
                break
            # This is where I would do some processing on the data
            print('consumer2', iteration.value, np_array1.mean(), np_array2.mean(), np_array3.mean(), time.time())
            time.sleep(0.1) # delay to simulate the time at which the data is processed
            event_consumer2.set()  # Signal the producer that the data has been processed)
        existing_shm.close()
    
    if __name__ == '__main__':
        # assume we have 3 arrays of different sizes
        shape1 = (5000, 50)
        shape2 = (5000, 50)
        shape3 = (5000, 50)
    
    
        total_size = mul(*shape1)*4 + mul(*shape2)*4 + mul(*shape3)*4
        shm = shared_memory.SharedMemory(create=True, size=total_size)
        event_consumer1 = Event()
        event_consumer2 = Event()
        event_consumer1.set()  # Set the event to allow the producer to start
        event_consumer2.set()  # Set the event to allow the producer to start
        event_producer1 = Event()
        event_producer2 = Event()
        iteration = Value('i', 0)
        running = Value('i', 1)
    
        p1 = Process(target=producer, args=(shm.name, shape1, shape2, shape3,  event_consumer1, event_consumer2, event_producer1, event_producer2, iteration, running))
        p2 = Process(target=consumer1, args=(shm.name, shape1, shape2, shape3, event_consumer1, event_producer1,  iteration, running))
        p3 = Process(target=consumer2, args=(shm.name, shape1, shape2, shape3, event_consumer2, event_producer2, iteration, running))
    
        p2.start()
        p3.start()
        time.sleep(2) # delay to make sure the consumer processes are ready
        p1.start()
    
        p2.join()
        p3.join()
        p1.join()
    
        # Didspose of shared memory - Booboo:
        shm.close()
        shm.unlink()
    

    Prints:

    producer 0 2.029968023300171
    consumer1 0 127.14897 127.03962 127.098465 1706964539.3494873
    consumer2 0 127.14897 127.03962 127.098465 1706964539.3494873
    producer 1 2.033132553100586
    consumer1 1 127.50007 127.03064 127.2023 1706964541.3846073
    consumer2 1 127.50007 127.03064 127.2023 1706964541.3846073
    producer 2 2.0330991744995117
    consumer2 2 126.92322 126.934265 126.91858 1706964543.423699
    consumer1 2 126.92322 126.934265 126.91858 1706964543.423699
    

    Can We Do Better?

    The problem with this design is what if you have a large number of consumers, for example 10? That's a lot of events that have to be created and set. Instead we will use two multiprocessing.Condition instances as follows:

    from multiprocessing import shared_memory, Process, Value, Condition
    import numpy as np
    import time
    # Used this to compute the size of shared memory - Booboo:
    from operator import mul
    
    
    # create a shared memory and write to it (producer
    def producer(name, shape1, shape2, shape3, n_consumers, produce_condition, consume_condition, consumed_count, iteration, running):
        existing_shm = shared_memory.SharedMemory(name=name)
        np_array1 = np.ndarray(shape1, dtype=np.float32, buffer=existing_shm.buf[:mul(*shape1)*4])
        np_array2 = np.ndarray(shape2, dtype=np.float32, buffer=existing_shm.buf[4*mul(*shape1):4*(mul(*shape1)+mul(*shape2))])
        np_array3 = np.ndarray(shape3, dtype=np.float32, buffer=existing_shm.buf[4*(mul(*shape1)+mul(*shape2)):])
        for i in range(3): # Loop just 3 times
            # Produce data for the next iteration without updating shared memory:
            start_time = time.time()
            time.sleep(2) # delay to simulate the time at which the data is produced
            array1 = np.random.randint(0, 255, np_array1.shape)
            array2 = np.random.randint(0, 255, np_array2.shape)
            array3 = np.random.randint(0, 255, np_array3.shape)
    
            if i != 0:
                # Wait for prior data to be consumed before we can update shared memory:
                with produce_condition:
                    produce_condition.wait_for(lambda: consumed_count.value == n_consumers)
                consumed_count.value = 0  # reset for next time
    
            # Update shared memory with already computed data:
            np_array1[:] = array1
            np_array2[:] = array2
            np_array3[:] = array3
            print('producer', i, time.time()-start_time)
    
            with consume_condition:
                iteration.value = i
                # Tell consumers there is new data:
                consume_condition.notify_all()
    
        # Show we are no loner running:
        with consume_condition:
            running.value = 0
            consume_condition.notify_all()
    
        existing_shm.close()
    
    # read from the shared memory using a different process:
    def consumer(id, name, shape1, shape2, shape3, produce_condition, consume_condition, consumed_count, iteration, running):
        existing_shm = shared_memory.SharedMemory(name=name)
        np_array1 = np.ndarray(shape1, dtype=np.float32, buffer=existing_shm.buf[:mul(*shape1)*4])
        np_array2 = np.ndarray(shape2, dtype=np.float32, buffer=existing_shm.buf[4*mul(*shape1):4*(mul(*shape1)+mul(*shape2))])
        np_array3 = np.ndarray(shape3, dtype=np.float32, buffer=existing_shm.buf[4*(mul(*shape1)+mul(*shape2)):])
    
        next_iteration = -1
        while True:
            next_iteration += 1 # What we expect the next iteration value to be:
            with consume_condition:
                # Wait for either the producer havinn terminated or a new iteration is available:
                consume_condition.wait_for(
                    lambda: not running.value or iteration.value == next_iteration
                )
    
            if iteration.value != next_iteration:
                # Running.value must be 0 but we do not check this because there can still be
                # new data we haven't processed even though the producer is no longer running,
                # So as long as iteration.value is the next iteration expected we have
                # new data to process.
                break
    
            print(f'consumer{id}', iteration.value, np_array1.mean(), np_array2.mean(), np_array3.mean(), time.time())
            time.sleep(1.5) # delay to simulate the time at which the data is processed
    
            # Show we have consumed the data:
            with produce_condition:
                consumed_count.value += 1
                produce_condition.notify(1)
    
        existing_shm.close()
    
    
    if __name__ == '__main__':
        # assume we have 3 arrays of different sizes
        shape1 = (5000, 50)
        shape2 = (5000, 50)
        shape3 = (5000, 50)
    
    
        total_size = mul(*shape1)*4 + mul(*shape2)*4 + mul(*shape3)*4
        shm = shared_memory.SharedMemory(create=True, size=total_size)
    
        N_CONSUMERS = 10
    
        produce_condition = Condition()
        consume_condition = Condition()
    
        consumed_count = Value('i', 0)
    
        iteration = Value('i', -1)
        running = Value('i', 1)
    
        processes = []
    
        p = Process(target=producer, args=(shm.name, shape1, shape2, shape3, N_CONSUMERS, produce_condition, consume_condition, consumed_count, iteration, running))
        processes.append(p)
    
        for id in range(N_CONSUMERS):
            processes.append(Process(target=consumer, args=(id, shm.name, shape1, shape2, shape3, produce_condition, consume_condition, consumed_count, iteration, running)))
    
        for p in processes:
            p.start()
    
        t = time.time()
    
        for p in processes:
            p.join()
    
        print('Total time:', time.time() - t)
        # Didspose of shared memory:
        shm.close()
        shm.unlink()
    

    Prints:

    producer 0 2.0170440673828125
    consumer1 0 126.91085 127.04398 126.95515 1706970488.860498
    consumer9 0 126.91085 127.04398 126.95515 1706970488.860498
    consumer5 0 126.91085 127.04398 126.95515 1706970488.860498
    consumer2 0 126.91085 127.04398 126.95515 1706970488.860498
    consumer8 0 126.91085 127.04398 126.95515 1706970488.860498
    consumer0 0 126.91085 127.04398 126.95515 1706970488.860498
    consumer4 0 126.91085 127.04398 126.95515 1706970488.860498
    consumer3 0 126.91085 127.04398 126.95515 1706970488.860498
    consumer7 0 126.91085 127.04398 126.95515 1706970488.860498
    consumer6 0 126.91085 127.04398 126.95515 1706970488.860498
    producer 1 2.000509023666382
    consumer9 1 126.91934 127.10621 127.05637 1706970490.8453243
    consumer0 1 126.91934 127.10621 127.05637 1706970490.8453243
    consumer1 1 126.91934 127.10621 127.05637 1706970490.8453243
    consumer8 1 126.91934 127.10621 127.05637 1706970490.8453243
    consumer2 1 126.91934 127.10621 127.05637 1706970490.8453243
    consumer5 1 126.91934 127.10621 127.05637 1706970490.8453243
    consumer4 1 126.91934 127.10621 127.05637 1706970490.8453243
    consumer3 1 126.91934 127.10621 127.05637 1706970490.8453243
    consumer7 1 126.91934 127.10621 127.05637 1706970490.8453243
    consumer6 1 126.91934 127.10621 127.05637 1706970490.8453243
    producer 2 2.016110420227051
    consumer9 2 127.029526 126.93016 126.82547 1706970492.8614347
    consumer8 2 127.029526 126.93016 126.82547 1706970492.8614347
    consumer2 2 127.029526 126.93016 126.82547 1706970492.8614347
    consumer0 2 127.029526 126.93016 126.82547 1706970492.8614347
    consumer6 2 127.029526 126.93016 126.82547 1706970492.8614347
    consumer1 2 127.029526 126.93016 126.82547 1706970492.8614347
    consumer3 2 127.029526 126.93016 126.82547 1706970492.8614347
    consumer5 2 127.029526 126.93016 126.82547 1706970492.8614347
    consumer7 2 127.029526 126.93016 126.82547 1706970492.8614347
    consumer4 2 127.029526 126.93016 126.82547 1706970492.8614347
    Total time: 8.093537330627441
    

    Update

    I have modified the above code that uses conditions instead of events so that there is greater overlap in processing among the producers and consumers. Before, the producer started to produce the next iteration only after the consumers had all finished processing the current iteration. Now the producer can do all the work necessary to create the new array values for the next iteration in parallel with the consumers consuming the current values and waits for the consumers to finish their processing before updating the shared arrays with the new data. I have increased the consumption time to 1.5 seconds. In the previous version this would have added several seconds to the total running time to process 3 iterations.