After using a Pool
from Python's multiprocessing
to parallelize some computationally intensive work, I wish to retrieve statistics that were kept local to each spawned process. Specifically, I have no real-time interest in these statistics, so I do not want to bear the overhead that would be involved with using a synchronized data structure in which statistics could be kept.
I've found some suggestions where the idea would be to use a second pool.map()
with a different function which returns the state local to its worker. I believe this to be incorrect since there is no guarantee that this second invocation would lead to exactly one job being distributed to every worker process in the pool. Is there a mechanism that would achieve this?
Skeleton snippet where it's unclear what can be done after the imap_unordered() completes.
import multiprocessing as mp
import random
local_stats = {"success": 0, "fails": 0}
def do_work(_):
if random.choice([True, False]):
local_stats["success"] += 1
else:
local_stats["fails"] += 1
if __name__ == "__main__":
with mp.Manager() as manager:
with mp.Pool(processes=2) as pool:
results = list(pool.imap_unordered(do_work, range(1000)))
# after .imap_unordered() completes, aggregate "local_stats" from each process in the pool
# by either retrieving its local_stats, or having them push those stats to the main process
# ???
IDK if this is the best solution (could you just log to a file as you go, then parse the files from each child afterwards?), but you mentioned ensuring a task is evenly distributed to all workers. This would commonly be achieved with a Barrier
. It is somewhat difficult to pass certain things to child processes like locks queues and such so we will pass it as an argument to the initialization function for all child processes. Here's an example:
import multiprocessing as mp
import random
local_stats = {"success": 0, "fails": 0}
def do_work(_):
global local_stats
if random.choice([True, False]):
local_stats["success"] += 1
else:
local_stats["fails"] += 1
def init_pool(barrier): # save barrier to child process globals as pool init (barrier must be sent to child at process creation)
global sync_barrier
sync_barrier = barrier
def return_stats(_): # needs dummy arg to call with "map" functions
global sync_barrier
sync_barrier.wait() # wait for other processes to also be waiting
# may raise BrokenBarrierError
global local_stats
return local_stats
if __name__ == "__main__":
nprocs = 2 # re-use number of processes with barrier and pool constructor to make sure you use the same number
barrier = mp.Barrier(nprocs)
with mp.Pool(processes=nprocs, initializer=init_pool, initargs=(barrier,)) as pool:
results = list(pool.imap_unordered(do_work, range(1000)))
stats = list(pool.imap_unordered(return_stats, range(nprocs), 1)) # force chunksize to 1 to ensure 1 task per child process
print(stats)