pythoncudamultiprocessingnvidianvtx

nvtx markers with Python Multiprocessing


I'm trying to use nvtx markers along with multiprocessing pool in Python, but when only a child process calls an annotated function the operation doesn't appear in the profiling report. Is there any way to get around this, or is this a limitation of python processes? Here's some example code to replicate:

import os
import time
from multiprocessing import Pool, shared_memory

import numpy as np
import nvtx

N_SAMPLES = int(1e6)
SIGNAL = np.random.randn(N_SAMPLES) + 1j * np.random.randn(N_SAMPLES)

@nvtx.annotate(color="red")
def create_shm_array(signal):
    # Store the signal in shared memory to share across processes
    shm = shared_memory.SharedMemory(create=True, size=signal.nbytes)
    shared_array = np.ndarray(signal.shape, dtype=signal.dtype, buffer=shm.buf)
    shared_array[:] = signal[:]
    return shm


def worker(shm_name):
    shm = shared_memory.SharedMemory(name=shm_name)
    sig = np.ndarray((N_SAMPLES,), dtype=complex, buffer=shm.buf)
    return expensive_op(sig)


@nvtx.annotate(color="blue")
def expensive_op(sig):
    time.sleep(2)
    return np.sum(sig)


def clean_shm(shm_name):
    shm = shared_memory.SharedMemory(name=shm_name)
    shm.close()
    shm.unlink()


if __name__ == "__main__":

    print(f"Total num_bytes: {SIGNAL.nbytes} B | {SIGNAL.nbytes / 1e9} GB")
    test = np.random.randn(10)
    expensive_op(test)
    shared_mem = create_shm_array(SIGNAL)

    with Pool(os.cpu_count()) as p:
        p.map(worker, [shared_mem.name] * 2)
    clean_shm(shared_mem.name)

Here's the Nvidia Nsight Systems Timeline. The Marker appears during the first call from the parent process, but does not appear when called by the child processes


Solution

  • By default, python multiprocessing forks new processes. We need it to spawn them. Working code below.

    import os
    import time
    from multiprocessing import Pool, shared_memory, get_context
    
    import numpy as np
    import nvtx
    
    N_SAMPLES = int(1e6)
    SIGNAL = np.random.randn(N_SAMPLES) + 1j * np.random.randn(N_SAMPLES)
    
    @nvtx.annotate(color="red")
    def create_shm_array(signal):
        # Store the signal in shared memory to share across processes
        shm = shared_memory.SharedMemory(create=True, size=signal.nbytes)
        shared_array = np.ndarray(signal.shape, dtype=signal.dtype, buffer=shm.buf)
        shared_array[:] = signal[:]
        return shm
    
    
    def worker(shm_name):
        shm = shared_memory.SharedMemory(name=shm_name)
        sig = np.ndarray((N_SAMPLES,), dtype=complex, buffer=shm.buf)
        return expensive_op(sig)
    
    
    @nvtx.annotate(color="blue")
    def expensive_op(sig):
        time.sleep(2)
        return np.sum(sig)
    
    
    def clean_shm(shm_name):
        shm = shared_memory.SharedMemory(name=shm_name)
        shm.close()
        shm.unlink()
    
    
    if __name__ == "__main__":
    
        print(f"Total num_bytes: {SIGNAL.nbytes} B | {SIGNAL.nbytes / 1e9} GB")
        test = np.random.randn(10)
        expensive_op(test)
        shared_mem = create_shm_array(SIGNAL)
    
        with get_context("spawn").Pool(os.cpu_count()) as p:
            p.map(worker, [shared_mem.name] * 2)
        clean_shm(shared_mem.name)