I have 4 function that I want to use multiprocessing with, each of these four function have to iterate over 3 different sets of inputs. My computer has 8 cores, therefore I want to have 8 pool workers, but for this the workers have to call different functions at the same time, this is the code I have
with Pool() as pool:
u1, u2, u3 = pool.starmap(velo,Gamma_velo)
ux, uy, uz = zip(*pool.starmap(velo_grad,Gamma_velo_grad))
omega1, omega2, omega3 = pool.map(vort,Gamma_vort)
omegax, omegaz, omegay = zip(*pool.map(vort_grad,Gamma_vort))`
But I can only get 3 workers to work at the same time.
I hope it is clear. Thank you!
When you have a statement such as u1, u2, u3 = pool.starmap(velo,Gamma_velo)
you are (1) submitting N tasks to the pool where N is the length of Gamma_velo
and (2) you block until all submitted tasks have completed.
You have two invocations of method multiprocessing.pool.Pool.starmap
and two invocations of two invocations of method multiprocessing.pool.Pool.map
. Ideally you would like whatever tasks each of these invocations end up submitting to run in parallel limited by your having only 8 cores. This appears to be possible since any invocation does not depend on the results of a previous invocation. So instead of using the blocking starmap
and map
methods, we use instead the non-blocking starmap_async
and map_async
methods, which return multiprocessing.pool.AsyncResult
instances representing the future completions of the submitted tasks. You then can call method get
on these instances to return the results of the task submissions. This call will block until the submitted tasks have been completed but we only call this method after all tasks have been submitteed:
with Pool() as pool:
#u1, u2, u3 = pool.starmap(velo,Gamma_velo)
async_result_1 = pool.starmap_async(velo,Gamma_velo)
#ux, uy, uz = zip(*pool.starmap(velo_grad,Gamma_velo_grad))
async_result_2 = pool.starmap_async(velo_grad,Gamma_velo_grad)
#omega1, omega2, omega3 = pool.map(vort,Gamma_vort)
async_result_3 = pool.map_async(vort,Gamma_vort)
#omegax, omegaz, omegay = zip(*pool.map(vort_grad,Gamma_vort))
async_result_4 = pool.map_async(vort_grad,Gamma_vort)
# Now wait for each submitted task to complete and process
# the results:
u1, u2, u3 = async_result_1.get()
ux, uy, uz = zip(*async_result_2.get())
omega1, omega2, omega3 = async_result_3.get()
omegax, omegaz, omegay = zip(*async_result_4.get())