pythonprocessmultiprocessingpoolpathos

Python Process Pool with custom Process not able to respawn child processes


I have overridden multiprocess.Process (fork of multiprocessing library) like so:

# source.Process.py

class Process(multiprocess.Process):

def __init__(self, test_name: str, *args, **kwargs) -> None:
  
    multiprocess.Process.__init__(self, *args, **kwargs)
    self._parent_conn, self._child_conn = multiprocess.Pipe()
    self._exception = None
    self._test_name = test_name

def run(self) -> None:
    try:
        start = time.perf_counter()
        logger = S_Logger(self._test_name).get_logger()
        logger.info('EXECUTION OF %s HAS STARTED.', self._test_name)
        multiprocess.Process.run(self)
        self._child_conn.send(None)

    except Exception as e:
        tb = traceback.format_exc()
        logger.error(f'EXCEPTION OCCURRED: {tb}')
        self._child_conn.send((e, tb))

    finally:
        logger.info('EXECUTION OF %s HAS ENDED.', self._test_name)
        end = time.perf_counter()
        logger.info(f'FINISHED in {round(end-start, 2)} second(s)')

When I create normal Process using this class everything works perfectly, including creating logs. Now I want to create a Process Pool of such customized processes but I encountered problem with respawning such processes after they life comes to an end. Here is how I create pool with additional maxtasksperchild=1 argument.

from source.process import Process
ctx = multiprocess.get_context()

def run_tests(self):

    def worker(x):
        print(x**2)
        time.sleep(1)

    with ctx.Pool(processes=4, maxtasksperchild=1) as pool:

        nums = range(10)
        ctx.Process = Process(test_name='test_name')
        pool.map(worker, nums)

This gives me such output:

0
1
4
9
Exception in thread Thread-1 (_handle_workers):
Traceback (most recent call last):
  File "C:\Program Files\WindowsApps\PythonSoftwareFoundation.Python.3.10_3.10.2032.0_x64__qbz5n2kfra8p0\lib\threading.py", line 1016, in _bootstrap_inner
    self.run()
  File "C:\Program Files\WindowsApps\PythonSoftwareFoundation.Python.3.10_3.10.2032.0_x64__qbz5n2kfra8p0\lib\threading.py", line 953, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Users\<user>\Documents\Projects\sprinter\.venv\lib\site-packages\multiprocess\pool.py", line 513, in _handle_workers
    cls._maintain_pool(ctx, Process, processes, pool, inqueue,
  File "C:\Users\<user>\Documents\Projects\sprinter\.venv\lib\site-packages\multiprocess\pool.py", line 337, in _maintain_pool
    Pool._repopulate_pool_static(ctx, Process, processes, pool,
  File "C:\Users\<user>\Documents\Projects\sprinter\.venv\lib\site-packages\multiprocess\pool.py", line 319, in _repopulate_pool_static
    w = Process(ctx, target=worker,
  File "C:\Users\<user>\Documents\Projects\sprinter\.venv\lib\site-packages\multiprocess\pool.py", line 181, in Process
    return ctx.Process(*args, **kwds)
TypeError: 'Process' object is not callable

And this brings to my mind two questions:

  1. Why there is no logging? If I don't use pool, logs appear correctly.
  2. Why after four processes being executed, the new ones that should be respawned have problem to be created? (Not callable error). If I remove the maxtasksperchild argument it works perfectly (0, 1, 4, 9, 16, 25...)

Solution

  • This gives me such output

    The error here is because you are replacing ctx.Process (a class) with an instance of your own subclass. Instances, unless they have __call__ method defined, are not callable. But even if you were to replace it with your subclass, it wouldn't work. This is because you will get a recursion or attribute error since you are replacing a class with a subclass of that same class.

    Why there is no logging? If I don't use pool, logs appear correctly.

    This is because you never really successfully patched the pool class to use your subclass of Process, this also ties into your second question (read on).

    Why after four processes being executed, the new ones that should be respawned have problem to be created? (Not callable error). If I remove the maxtasksperchild argument it works perfectly (0, 1, 4, 9, 16, 25...)

    The reason this happens is because pool creates the processes when you start the context manager itself (on line ctx.Pool(processes=4, maxtasksperchild=1) as pool). Since you are applying your patch after the processes start, it won't have much of an effect unless the pool was to start the processes again (this is where maxtasksperchild comes in). Hence if you provide maxtasksperchild then the pool will attempt to start another process, but because of the faulty patch, it will return error. If you don't set a maxtasksperchild then the pool won't care about the patch you applied since it doesn't have to start a process again.

    Regardless, here's a better patch to do what you want

    from multiprocess.pool import Pool
    from functools import partial
    import multiprocess
    import time
    
    class Process(multiprocess.Process):
    
        def __init__(self, *args, test_name='', **kwargs) -> None:
    
            multiprocess.Process.__init__(self, *args, **kwargs)
            self._parent_conn, self._child_conn = multiprocess.Pipe()
            self._exception = None
            self._test_name = test_name
    
        def run(self) -> None:
            # Have your own implementation here
            pass
    
    
    def _Process(ctx, *args, **kwds):
        return ctx.MyProcess(*args, **kwds)
    
    def worker(x):
        print(x ** 2)
        time.sleep(1)
    
    if __name__ == "__main__":
        ctx = multiprocess.get_context()
    
        # Some patching, we add our subclass as an attribute to the context
        ctx.MyProcess = Process
        
        # Fix test_name to be passed as a kwarg whenever the pool starts a process. Pretty lazy but gets the job done. 
        test_name = 'test_name'
        Pool.Process = partial(_Process, test_name=test_name)
    
        with ctx.Pool(processes=4, maxtasksperchild=1) as pool:
            nums = range(10)
            pool.map(worker, nums)
    

    Note how test_name is now a keyword argument and also optional. This is so to make it work with functools.partial. You probably want to perform checks so that the value is passed and is valid.