I am trying to write a Python script to run another potentially multithreaded script in a way to further parallelise them.
Consider a (C++) executable which can be run as run_task <sample_number>
, which spawns 10 threads. The sample numbers are in a Python list. What is the ideal way to execute them in a Python script, such that at any time, three executables are always running?
Code snippet
if __name__ == "__main__":
# Path to the C++ executable, it spawns 10 threads to generate the output
CPP_EXECUTABLE = "./GenerateTrimmedNTuple"
samples = json.load(open("samples.json"))
commands = [[CPP_EXECUTABLE, sample_name] for sample_name in samples]
# Idea is to make a loop that ensure atleast 3 commands are running at any time
# Thus ensuring not more than 30 threads are being spawned at any time
nrunning = 0
while (nrunning < 3):
# run command and increment n
# if any command finishes, decrement n
pass
P.S. Previously, I split the total number of samples into three bash scripts and used to run all three scripts simultaneously by using a tmux session with three panes. Hoping to do it in a better way.
you can use subprocess.run to run a process, and multiprocessing.pool.ThreadPool with a maximum number of workers to run them concurrently.
import multiprocessing.pool
import json
import subprocess
if __name__ == "__main__":
# Path to the C++ executable, it spawns 10 threads to generate the output
CPP_EXECUTABLE = "./GenerateTrimmedNTuple"
samples = json.load(open("samples.json"))
commands = [[CPP_EXECUTABLE, sample_name] for sample_name in samples]
max_workers = 3
with multiprocessing.pool.ThreadPool(max_workers) as pool:
def worker_function(command_args):
subprocess.run(command_args, check=True)
return command_args
for return_value in pool.imap_unordered(worker_function, commands):
print(f"command: {return_value} finished!")
check=True
makes it so that an exception is thrown and the entire application ends with an error if one process returned a non-zero value .... you can omit it but check other arguments of subprocess.run such as capture_output
you can wrap it in tqdm
this way
for return_value in tqdm.tqdm(pool.imap_unordered(worker_function, commands), total=len(commands)):