pythonpathosprocess-pool

pathos pools: Renew worker processes after N tasks


I am building a parallel python application, that essentially calls a C wrapper around an external library. Parallelism is needed to be able to run simultaneously computations on all CPU cores.

I ended up using pathos.multiprocessing.ProcessPool, but these pools lacks the maxtaskperchild argument of the standard multiprocessing.Pool class constructor (see reference here ). I need this feature because the C library relies on the process clock to define some execution time limits, that are eventually reached when the tasks pile up.

Is there a way to ask the ProcessPool manager to renew worker processes after a given number of tasks ?

Example code to clarify my intent:

from pathos.pools import ProcessPool
from os import getpid
import collections

def print_pid(task_id):
    pid = getpid()
    return pid

if __name__ == "__main__":
    NUM_TASKS = 50
    MAX_PER_CHILD = 2


    # limit each process to maximum MAX_PER_CHILD tasks
    # we would like the pool to exit the process and spawn a new one
    # when a task counter reaches the limit
    # below argument 'maxtasksperchild' would work with standard 'multiprocessing'
    pool = ProcessPool(ncpu=2, maxtasksperchild=MAX_PER_CHILD)
    results = pool.map(print_pid, range(NUM_TASKS), chunksize=1)

    tasks_per_pid = dict(collections.Counter(results))
    print(tasks_per_pid)

# printed result
# {918: 8, 919: 6, 920: 6, 921: 6, 922: 6, 923: 6, 924: 6, 925: 6}
# observe that all processes did more than MAX_PER_CHILD tasks

what I tried


Solution

  • In pathos.multiprocessing there are two pools: ProcessPool and _ProcessPool. The former is designed to have an augmented pool lifecycle that minimizes start-up time, and has persistence and restart capabilities -- however, lacks some of the "multiprocessing" keywords. The latter (_ProcessPool) is one level of API design down, and provides an interface that is identical to the multiprocessing Pool interface (but uses dill). So, take a look at the _ProcessPool.