pythonmultiprocessingconcurrent.futurespathos

Multiprocessing with nested paralellism in python


I've been having many issues trying to run some code with multiprocessing tools. I've found many examples of using different tools for the task, but most of them were too simplistic, thus I am not sure if I used them incorrectly or was it wrong to use them at all.

I have a large code that runs on several large datasets. To save time, I'd like to run the datasets in parallel. In general, my case is as follows:

funA(var1, var2, var3):
   # Some Code #

funB(varB):
   # Some Code #
   
   funA(x1, x2, x3)
   funA(y1, y2, y3)

   # Some Code #

# Some Code #

funB(z1)
funB(z2)

# Some Code #

I'd like to run instances of funB in parallel and when the code reaches funA, I'd like its instances to parallelize as well. So far I've tried several solutions: Attempt 1:

from multiprocessing import Pool

funA(var1, var2, var3):
   # Some Code #

funB(varB):
   # Some Code #
   
   pool1 = Pool()
   pool1.apply_async(funA, [x1, x2, x3])
   pool1.apply_async(funA, [y1, y2, y3])

   # Some Code #

# Some Code #

pool0 = Pool()
pool0.map(funB, [z1, z2])

# Some Code #

This gave me some windows error. Attempt 2:

from pathos.multiprocessing import ProcessingPool, ThreadingPool
amap = ProcessingPool().amap
tmap = ThreadingPool().map

funA(var1, var2, var3):
   # Some Code #

funB(varB):
   # Some Code #
   
   tmap(funA, [x1, x2, x3])
   tmap(funA, [y1, y2, y3])

   # Some Code #

# Some Code #

amap(funB, [z1, z2])

# Some Code #

This also gave me some windows error. Attempt 3:

from concurrent.futures import ThreadPoolExecutor
Pool1 = ThreadPoolExecutor(max_workers=2)
Pool2 = ThreadPoolExecutor(max_workers=2)

funA(var1, var2, var3):
   # Some Code #

funB(varB):
   # Some Code #
   
   Pool2.submit(funA, x1, x2, x3)
   Pool2.submit(funA, y1, y2, y3)
   Pool2.shutdown(wait=True)

   # Some Code #

# Some Code #

Pool1.submit(funB, [z1, z2])
Pool1.shutdown(wait=True)

# Some Code #

This does not crash, but it doesn't execute the functions either. It skips right away to the code that follows the parallel functions. I've read that using shutdown(wait=True) should prevent that, but even after adding it, the result is the same. It doesn't seem to execute the functions.

I would be very grateful if someone could explain to me how to do this properly. I am using a Windows 11 device and Anaconda with Python 3.11. Thank you all in advance.


Solution

  • As I understand it you have a function, e.g. funB, that you would like to run in parallel and each time funB is invoked its processing uses funA multiple times with different arguments and you would like to run these invocations of funA in parallel.

    In a perfect world you would create a multiprocessing pool to run your funB tasks in parallel and funB would create its own multiprocessing pool to run its funA tasks in parallel. The first problem with this is that multiprocessing pools create what's known as daemon processes, i.e. processes that are automatically terminated when their parent process terminates. But daemon processes are not allowed to create child processes, which would be required for funB to create its own multiprocessing pool. There is some "clever" coding that can be done (search this site) so that the multiprocessing pool instead creates non-daemon processes. But this raises another problem. Let's say you need to invoke funB M times and each invocation would create a multiprocessing pool of size N because it needs to invoke funA N times. That is a total of M * N processes being created to run all M * N funB invocations in parallel. First, process creation, especially on Windows (you didn't specify your platform as you are requested to do), is rather expensive. Moreover, M * N might be considerably larger than the number of CPUs you have. There is nothing to be gained by creating more processes than CPUs you have unless these processes are doing a fair amount of I/O or network processing and therefore are idle a large percentage of the time waiting for I/O or network requests to complete.

    I suggest, therefore, that you create both a multithreading pool and a multiprocessing pool. On the assumption that most of your processing is CPU-intensive, the multiprocessing pool size should not exceed the number of CPUs you have. I would use the multiprocessing pool to run your funB invocations concurrently. Each invocation of funB will be passed the single multiprocessing pool that is used to execute in parallel its funA tasks and any other CPU-intensive processing it might have. For example:

    from multiprocessing.pool import Pool
    from multiprocessing.pool import ThreadPool
    from multiprocessing import cpu_count
    from functools import partial
    
    import time
    
    def fun_A(var1: int, var2: int, var3: int) -> int:
        # CPU-intensive code will be simulated
        print('fun_A called at time', time.time(), flush=True)
        time.sleep(3)
        return var1 * var2 * var3
    
    def fun_B(pool: Pool,
              x1: int,
              x2: int,
              x3: int,
              y1: int,
              y2: int,
              y3: int
        ) -> int:
        result1 = pool.apply_async(fun_A, args=(x1, x2, x3))
        result2 = pool.apply_async(fun_A, args=(y1, y2, y3))
    
        return result1.get() + result2.get()
    
    def main():
        args = [
            [1, 2, 3, 4, 5, 6],
            [10, 20, 30, 40, 50, 60],
            [100, 200, 300, 400, 500, 600],
        ]
    
        # First result should be (1 * 2 * 3) + (4 * 5 * 6) = 126
        # Subsequent results are 1000 times larger than the prior result
    
        thread_pool_size = len(args)
        # Each element of args is used to call fun_B, which
        # creates 2 tasks. But there is no point in creating
        # a pool size greater than the number of CPUs we have
        # if we assume that fun_B is very CPU-intensive with minimal
        # I/O or network activity:
        pool_size = min(cpu_count(), len(args) * 2)
    
        with ThreadPool(thread_pool_size) as thread_pool, \
        Pool(pool_size) as pool:
            t = time.monotonic()
            worker = partial(fun_B, pool)
            results = thread_pool.starmap(worker, args)
            elapsed_time = time.monotonic() - t
    
        print(results, 'elapsed time =', elapsed_time)
    
    if __name__ == '__main__':
        main()
    

    Prints:

    fun_A called at time 1741726531.5217073
    fun_A called at time 1741726531.5217073
    fun_A called at time 1741726531.5217073
    fun_A called at time 1741726531.5217073
    fun_A called at time 1741726531.5217073
    fun_A called at time 1741726531.5217073
    [126, 126000, 126000000] elapsed time = 3.155999999959022