pythonmultiprocessingsubprocessexecutable

Using Python to execute programs on various processors through multiprocessing


I want to run different executables on different processors at the same time, so I took example from here: Is there a way to assign different jobs (processes) to specific core in linux using python? and tried to run executables, however FOR cycle waits for executable to finish to assign new process. Is there a way to assign new process to next core without waiting?

import os
from itertools import cycle
import multiprocessing as mp
import subprocess

def map_hack(AFF):
  my_pid = os.getpid()
  old_aff = os.sched_getaffinity(0)
  os.sched_setaffinity(0,AFF)
  return (my_pid, old_aff, os.sched_getaffinity(0))

PROCESSES = os.cpu_count()
_mycpus = cycle(os.sched_getaffinity(0))
cpus = [[next(_mycpus)] for x in range(PROCESSES)]




with mp.Pool(processes=PROCESSES) as pool:
  for x in pool.map(map_hack, cpus, chunksize=1):
     print("Try 2:My pid is {} and my old aff was {}, my new aff is {}".format(*x))
    z = str( x[2])
    z = z[:-1]
    z = int( z[-1:])
    print (z)
    if (z == 0):
        print("program waits for this executable to finish")
        subprocess.run(["./exevision"])

pool.close()
pool.join()

Solution

  • I apologize for this being a rather long answer, but I hope it will be worth your while to read it carefully.

    First, I am not sure why you would want to limit each pool process to a single core as opposed to letting the underlying platform assign whatever core is free (perhaps you are operating under some misunderstanding?). But if you really do what to do this, then here is what I have to say:

    I don't believe that the post you referenced for assigning CPU affinity is relevant to your question. For one thing, it attempts to assign to each process the same 4 cores as opposed to assigning to each pool process its own, unique core. Secondly, the second approach it uses, i.e. using the multiprocessing.pool.Pool.map method, seems inadequate to me since the tasks being submitted via the map call could possibly be all executed by the same pool process resulting in repeatedly setting the same process's CPU affinity. That is, when you have a pool of size N and you submit N tasks, there is no guarantee that each of the N tasks will be executed by different pool processes; they could all be executed by the same pool process.

    I would instead use a shareable integer value that each pool process as part of its initialization will get the current value of to be used as its CPU affinity and then increment the value for the next pool process to use. These two operations must be done under control of a lock.

    The following code shows the general approach to initializing a multiprocessing pool with a pool initializer so that each pool process is assigned its own CPU. But again, when you submit tasks to this pool you have no control over which pool process will execute the task.

    import os
    from multiprocessing import Pool, Value
    
    affinity = Value('i', 0)
    
    def _myinit():
        my_pid = os.getpid()
        old_aff = os.sched_getaffinity(0)
        with affinity.get_lock():
            aff = affinity.value
            affinity.value += 1
        os.sched_setaffinity(0, [aff])
        new_aff = os.sched_getaffinity(0)
        print(f'My pid is {my_pid} and my old aff was {old_aff}, my new aff is {new_aff}')
    
    
    PROCESSES = os.cpu_count()
    
    with Pool(processes=PROCESSES, initializer=_myinit) as pool:
        ... # submit some tasks
        # Wait for tasks to complete
        pool.close()
        pool.join()
    

    Prints:

    My pid is 70 and my old aff was {0, 1, 2, 3, 4, 5, 6, 7}, my new aff is {0}
    My pid is 71 and my old aff was {0, 1, 2, 3, 4, 5, 6, 7}, my new aff is {1}
    My pid is 72 and my old aff was {0, 1, 2, 3, 4, 5, 6, 7}, my new aff is {2}
    My pid is 73 and my old aff was {0, 1, 2, 3, 4, 5, 6, 7}, my new aff is {3}
    My pid is 74 and my old aff was {0, 1, 2, 3, 4, 5, 6, 7}, my new aff is {4}
    My pid is 76 and my old aff was {0, 1, 2, 3, 4, 5, 6, 7}, my new aff is {6}
    My pid is 75 and my old aff was {0, 1, 2, 3, 4, 5, 6, 7}, my new aff is {5}
    My pid is 77 and my old aff was {0, 1, 2, 3, 4, 5, 6, 7}, my new aff is {7}
    

    But if your purpose is to execute in parallel instances of an external program using subprocess.run, in general you would probably want to be using a multithreading pool instead since it is cheaper to create threads rather than processes and the subprocess.run command will be creating a new process anyway and therefore you will not be constrained by the Global Interpreter Lock. So consider the following code, which ensures that each call to subprocess.run is being executed in a different CPU and the external programs will be executed in parallel:

    import os
    from multiprocessing.pool import ThreadPool
    from threading import Lock
    import subprocess
    
    affinity = 0
    lock = Lock()
    
    def _myinit():
        global affinity
    
        my_pid = os.getpid()
        old_aff = os.sched_getaffinity(0)
        with lock:
            aff = affinity
            affinity += 1
        os.sched_setaffinity(0, [aff])
        new_aff = os.sched_getaffinity(0)
        print(f'My pid is {my_pid} and my old aff was {old_aff}, my new aff is {new_aff}')
    
        subprocess.run('ls -l test.txt', shell=True)
    
    
    PROCESSES = os.cpu_count()
    
    with ThreadPool(processes=PROCESSES, initializer=_myinit) as pool:
        pool.close()
        pool.join()
    

    But what does this achieve for you? Even though each call to subprocess.run is executed on a different CPU, the ls command itself will be executed in a new process and thus a new thread for which no CPU affinity has been set.

    I would not worry about setting CPU affinity at all nor be concerned which thread or process is executing a task. If you want to be running multiple external programs in parallel, the simplest and most efficient way to do this is as follows:

    from multiprocessing.pool import ThreadPool
    import subprocess
    
    def worker():
        subprocess.run('ls -l test.txt', shell=True)
    
    N_PROGRAMS = 4
    
    with ThreadPool(processes=N_PROGRAMS) as pool:
        for _ in range(N_PROGRAMS):
            pool.apply_async(worker)
        # Wait for all submitted tasks to complete:
        pool.close()
        pool.join()