pythonmultiprocessing

How to run multiple potentially multithreaded shell commands in python


I am trying to write a Python script to run another potentially multithreaded script in a way to further parallelise them.

Consider a (C++) executable which can be run as run_task <sample_number>, which spawns 10 threads. The sample numbers are in a Python list. What is the ideal way to execute them in a Python script, such that at any time, three executables are always running?

Code snippet

if __name__ == "__main__":
    
    # Path to the C++ executable, it spawns 10 threads to generate the output
    CPP_EXECUTABLE = "./GenerateTrimmedNTuple" 
    samples = json.load(open("samples.json"))
    commands = [[CPP_EXECUTABLE, sample_name] for sample_name in samples]

    # Idea is to make a loop that ensure atleast 3 commands are running at any time
    # Thus ensuring not more than 30 threads are being spawned at any time
    nrunning = 0
    while (nrunning < 3):
        # run command and increment n
        # if any command finishes, decrement n
        pass

P.S. Previously, I split the total number of samples into three bash scripts and used to run all three scripts simultaneously by using a tmux session with three panes. Hoping to do it in a better way.


Solution

  • you can use subprocess.run to run a process, and multiprocessing.pool.ThreadPool with a maximum number of workers to run them concurrently.

    import multiprocessing.pool
    import json
    import subprocess
    
    if __name__ == "__main__":
    
        # Path to the C++ executable, it spawns 10 threads to generate the output
        CPP_EXECUTABLE = "./GenerateTrimmedNTuple"
        samples = json.load(open("samples.json"))
        commands = [[CPP_EXECUTABLE, sample_name] for sample_name in samples]
        
        max_workers = 3
        with multiprocessing.pool.ThreadPool(max_workers) as pool:
    
            def worker_function(command_args):
                subprocess.run(command_args, check=True)
                return command_args
    
            for return_value in pool.imap_unordered(worker_function, commands):
                print(f"command: {return_value} finished!")
    
    

    check=True makes it so that an exception is thrown and the entire application ends with an error if one process returned a non-zero value .... you can omit it but check other arguments of subprocess.run such as capture_output


    you can wrap it in tqdm this way

    for return_value in tqdm.tqdm(pool.imap_unordered(worker_function, commands), total=len(commands)):