I'm trying to use nvtx markers along with multiprocessing pool in Python, but when only a child process calls an annotated function the operation doesn't appear in the profiling report. Is there any way to get around this, or is this a limitation of python processes? Here's some example code to replicate:
import os
import time
from multiprocessing import Pool, shared_memory
import numpy as np
import nvtx
N_SAMPLES = int(1e6)
SIGNAL = np.random.randn(N_SAMPLES) + 1j * np.random.randn(N_SAMPLES)
@nvtx.annotate(color="red")
def create_shm_array(signal):
# Store the signal in shared memory to share across processes
shm = shared_memory.SharedMemory(create=True, size=signal.nbytes)
shared_array = np.ndarray(signal.shape, dtype=signal.dtype, buffer=shm.buf)
shared_array[:] = signal[:]
return shm
def worker(shm_name):
shm = shared_memory.SharedMemory(name=shm_name)
sig = np.ndarray((N_SAMPLES,), dtype=complex, buffer=shm.buf)
return expensive_op(sig)
@nvtx.annotate(color="blue")
def expensive_op(sig):
time.sleep(2)
return np.sum(sig)
def clean_shm(shm_name):
shm = shared_memory.SharedMemory(name=shm_name)
shm.close()
shm.unlink()
if __name__ == "__main__":
print(f"Total num_bytes: {SIGNAL.nbytes} B | {SIGNAL.nbytes / 1e9} GB")
test = np.random.randn(10)
expensive_op(test)
shared_mem = create_shm_array(SIGNAL)
with Pool(os.cpu_count()) as p:
p.map(worker, [shared_mem.name] * 2)
clean_shm(shared_mem.name)
Here's the Nvidia Nsight Systems Timeline. The Marker appears during the first call from the parent process, but does not appear when called by the child processes
By default, python multiprocessing forks new processes. We need it to spawn them. Working code below.
import os
import time
from multiprocessing import Pool, shared_memory, get_context
import numpy as np
import nvtx
N_SAMPLES = int(1e6)
SIGNAL = np.random.randn(N_SAMPLES) + 1j * np.random.randn(N_SAMPLES)
@nvtx.annotate(color="red")
def create_shm_array(signal):
# Store the signal in shared memory to share across processes
shm = shared_memory.SharedMemory(create=True, size=signal.nbytes)
shared_array = np.ndarray(signal.shape, dtype=signal.dtype, buffer=shm.buf)
shared_array[:] = signal[:]
return shm
def worker(shm_name):
shm = shared_memory.SharedMemory(name=shm_name)
sig = np.ndarray((N_SAMPLES,), dtype=complex, buffer=shm.buf)
return expensive_op(sig)
@nvtx.annotate(color="blue")
def expensive_op(sig):
time.sleep(2)
return np.sum(sig)
def clean_shm(shm_name):
shm = shared_memory.SharedMemory(name=shm_name)
shm.close()
shm.unlink()
if __name__ == "__main__":
print(f"Total num_bytes: {SIGNAL.nbytes} B | {SIGNAL.nbytes / 1e9} GB")
test = np.random.randn(10)
expensive_op(test)
shared_mem = create_shm_array(SIGNAL)
with get_context("spawn").Pool(os.cpu_count()) as p:
p.map(worker, [shared_mem.name] * 2)
clean_shm(shared_mem.name)