I have a list with 12 items (each of which is a sample), and I need to analyse each of these samples using a function. The function is a wrapper for an external pipeline which needs four threads to run. Some pseudocode:
def my_wrapper(sample, other_arg):
cmd = 'external_pipeline --threads 4 --sample {0} --other {1}'.format(sample, other_arg)
subprocess.Popen(shlex.split(cmd),stderr=subprocess.PIPE,stdout=subprocess.PIPE).communicate()
Previously I ran the function for each sample serially using a loop, which worked, but is relatively inefficient given the my CPU has 20 cores. Example code:
sample_list = ['sample_'+str(x) for x in range(1,13)]
for sample in sample_list:
my_wrapper(sample)
I've tried to use multiprocessing to up the efficiency. I've done this successfully in the past using the starmap
function from multiprocessing
. Example:
with mp.Pool(mp.cpu_count()) as pool:
results = pool.starmap(my_wrapper, [(sample, other_arg) for sample in sample_list])
This approach has worked well previously when the function I'm calling requires only 1 thread/core per process. However, it doesn't seem to work as I naively expect/hope in my current circumstance. There are 12 samples, each needing to be analysed with 4 threads, but I only have 20 threads in total. Accordingly, I'd expect/hope for 5 samples to be run at a time (5 samples * 4 threads for each = 20 threads total). Instead, all samples appear to be analysed simultaneously, with all 20 threads being used, despite 48 threads being required for this to be efficient.
How might I efficiently run these samples so that only 5 are run in parallel (with each of these processes/jobs using 4 threads)? Do I need to specify a chunk size, or am I barking up the wrong tree with this thought?
Apologies for the vague title and post content, I wasn't sure how to word any of it better!
Limiting the number of cores in your multiprocessing pool will then save some cores free to run when running your wrapper. This will do the chunking for you:
with mp.Pool(mp.cpu_count() / num_cores_used_in_wrapper) as pool:
results = pool.starmap(my_wrapper, [(sample, other_arg) for sample in sample_list])