python-3.xmultiprocessingstarmap

Optimising python3 multiprocessing when each process requires multiple threads


I have a list with 12 items (each of which is a sample), and I need to analyse each of these samples using a function. The function is a wrapper for an external pipeline which needs four threads to run. Some pseudocode:

def my_wrapper(sample, other_arg):
    cmd = 'external_pipeline --threads 4 --sample {0} --other {1}'.format(sample, other_arg)
    subprocess.Popen(shlex.split(cmd),stderr=subprocess.PIPE,stdout=subprocess.PIPE).communicate()

Previously I ran the function for each sample serially using a loop, which worked, but is relatively inefficient given the my CPU has 20 cores. Example code:

sample_list = ['sample_'+str(x) for x in range(1,13)]
for sample in sample_list:
    my_wrapper(sample)

I've tried to use multiprocessing to up the efficiency. I've done this successfully in the past using the starmap function from multiprocessing. Example:

with mp.Pool(mp.cpu_count()) as pool:
    results = pool.starmap(my_wrapper, [(sample, other_arg) for sample in sample_list])

This approach has worked well previously when the function I'm calling requires only 1 thread/core per process. However, it doesn't seem to work as I naively expect/hope in my current circumstance. There are 12 samples, each needing to be analysed with 4 threads, but I only have 20 threads in total. Accordingly, I'd expect/hope for 5 samples to be run at a time (5 samples * 4 threads for each = 20 threads total). Instead, all samples appear to be analysed simultaneously, with all 20 threads being used, despite 48 threads being required for this to be efficient.

How might I efficiently run these samples so that only 5 are run in parallel (with each of these processes/jobs using 4 threads)? Do I need to specify a chunk size, or am I barking up the wrong tree with this thought?

Apologies for the vague title and post content, I wasn't sure how to word any of it better!


Solution

  • Limiting the number of cores in your multiprocessing pool will then save some cores free to run when running your wrapper. This will do the chunking for you:

    with mp.Pool(mp.cpu_count() / num_cores_used_in_wrapper) as pool:
        results = pool.starmap(my_wrapper, [(sample, other_arg) for sample in sample_list])