pythonmultiprocessing

How to collect process-local state after multiprocessing pool imap_unordered completes


After using a Pool from Python's multiprocessing to parallelize some computationally intensive work, I wish to retrieve statistics that were kept local to each spawned process. Specifically, I have no real-time interest in these statistics, so I do not want to bear the overhead that would be involved with using a synchronized data structure in which statistics could be kept.

I've found some suggestions where the idea would be to use a second pool.map() with a different function which returns the state local to its worker. I believe this to be incorrect since there is no guarantee that this second invocation would lead to exactly one job being distributed to every worker process in the pool. Is there a mechanism that would achieve this?

Skeleton snippet where it's unclear what can be done after the imap_unordered() completes.

import multiprocessing as mp
import random

local_stats = {"success": 0, "fails": 0}

def do_work(_):
  if random.choice([True, False]):
    local_stats["success"] += 1
  else:
    local_stats["fails"] += 1


if __name__ == "__main__":
  with mp.Manager() as manager:
    with mp.Pool(processes=2) as pool:
      results = list(pool.imap_unordered(do_work, range(1000)))

      # after .imap_unordered() completes, aggregate "local_stats" from each process in the pool
      # by either retrieving its local_stats, or having them push those stats to the main process
      # ???

Solution

  • IDK if this is the best solution (could you just log to a file as you go, then parse the files from each child afterwards?), but you mentioned ensuring a task is evenly distributed to all workers. This would commonly be achieved with a Barrier. It is somewhat difficult to pass certain things to child processes like locks queues and such so we will pass it as an argument to the initialization function for all child processes. Here's an example:

    import multiprocessing as mp
    import random
    
    local_stats = {"success": 0, "fails": 0}
    
    def do_work(_):
        global local_stats
        if random.choice([True, False]):
            local_stats["success"] += 1
        else:
            local_stats["fails"] += 1
    
    def init_pool(barrier):  # save barrier to child process globals as pool init (barrier must be sent to child at process creation)
        global sync_barrier
        sync_barrier = barrier
    
    def return_stats(_):  # needs dummy arg to call with "map" functions
        global sync_barrier
        sync_barrier.wait()  # wait for other processes to also be waiting
                             # may raise BrokenBarrierError
        global local_stats
        return local_stats
    
    if __name__ == "__main__":
        nprocs = 2  # re-use number of processes with barrier and pool constructor to make sure you use the same number
        barrier = mp.Barrier(nprocs)
        with mp.Pool(processes=nprocs, initializer=init_pool, initargs=(barrier,)) as pool:
            results = list(pool.imap_unordered(do_work, range(1000)))
    
            stats = list(pool.imap_unordered(return_stats, range(nprocs), 1))  # force chunksize to 1 to ensure 1 task per child process
            print(stats)