pythonmultiprocessingpool

Splitting Pool of workers into different multiprocessing function calls


I have 4 function that I want to use multiprocessing with, each of these four function have to iterate over 3 different sets of inputs. My computer has 8 cores, therefore I want to have 8 pool workers, but for this the workers have to call different functions at the same time, this is the code I have

with Pool() as pool:

    u1, u2, u3 = pool.starmap(velo,Gamma_velo)
    ux, uy, uz = zip(*pool.starmap(velo_grad,Gamma_velo_grad))
    omega1, omega2, omega3 = pool.map(vort,Gamma_vort)
    omegax, omegaz, omegay = zip(*pool.map(vort_grad,Gamma_vort))`

But I can only get 3 workers to work at the same time.

I hope it is clear. Thank you!


Solution

  • When you have a statement such as u1, u2, u3 = pool.starmap(velo,Gamma_velo) you are (1) submitting N tasks to the pool where N is the length of Gamma_velo and (2) you block until all submitted tasks have completed.

    You have two invocations of method multiprocessing.pool.Pool.starmap and two invocations of two invocations of method multiprocessing.pool.Pool.map. Ideally you would like whatever tasks each of these invocations end up submitting to run in parallel limited by your having only 8 cores. This appears to be possible since any invocation does not depend on the results of a previous invocation. So instead of using the blocking starmap and map methods, we use instead the non-blocking starmap_async and map_async methods, which return multiprocessing.pool.AsyncResult instances representing the future completions of the submitted tasks. You then can call method get on these instances to return the results of the task submissions. This call will block until the submitted tasks have been completed but we only call this method after all tasks have been submitteed:

    with Pool() as pool:
    
        #u1, u2, u3 = pool.starmap(velo,Gamma_velo)
        async_result_1 = pool.starmap_async(velo,Gamma_velo)
    
        #ux, uy, uz = zip(*pool.starmap(velo_grad,Gamma_velo_grad))
        async_result_2 = pool.starmap_async(velo_grad,Gamma_velo_grad)
    
        #omega1, omega2, omega3 = pool.map(vort,Gamma_vort)
        async_result_3 = pool.map_async(vort,Gamma_vort)
    
        #omegax, omegaz, omegay = zip(*pool.map(vort_grad,Gamma_vort))
        async_result_4 = pool.map_async(vort_grad,Gamma_vort)
        
    
        # Now wait for each submitted task to complete and process
        # the results:
        u1, u2, u3 = async_result_1.get()
        ux, uy, uz = zip(*async_result_2.get())
        omega1, omega2, omega3 = async_result_3.get()
        omegax, omegaz, omegay = zip(*async_result_4.get())