pythonipcdillpathosppft

How to do inter-process communication with pathos/ppft?


I'm using the pathos framework to do tasks concurrently, in different processes. Under the hood, this is done with ppft, which is part of pathos. My current approach uses a pathos.multiprocessing.ProcessPool instance. Multiple jobs are submitted to it, one at a time without blocking, using apipe(). The main process is then supervising the worker processes using ready() and get().

This is working fine, but results of the worker processes are only received after they finish (the process has ended). However, I need a way to get intermediate results from the worker process. I can't find anything clear about this in the docs of pathos/ppft, but from hints they contain there and here, it seems clear that this should be possible with their features. How do you do inter-process communication with pathos/ppft in combination with a ProcessPool?

The following demo code illustrates my approach. How could I send intermediate results to the main process?. For example, report the list of primes found so far, every time its length is a multiple of 100?

#!/usr/bin/env python3

import pathos


def worker_func(limit):
    """
    Dummy task: Find list of all prime numbers smaller than than "limit".
    """
    return limit, [
        num for num in range(2, limit) \
        if all(num % i != 0 for i in range(2, num))
    ]


pool = pathos.pools.ProcessPool()

jobs = []
jobs.append(pool.apipe(worker_func, 10000))
jobs.append(pool.apipe(worker_func, 15000))
jobs.append(pool.apipe(worker_func, 20000))

count_done_jobs = 0
while count_done_jobs < len(jobs):
    for job_idx, job in enumerate(jobs):
        if job is not None and job.ready():
            limit, primes = job.get()
            jobs[job_idx] = None
            count_done_jobs += 1
            print("Job {}: There are {} primes smaller than {}." \
                .format(job_idx, len(primes), limit))

Solution

  • Okay, I figured it out. I realized that pathos is using multiprocess for the ProcessPool, and multiprocess is a fork of the official Python module multiprocessing with only minor differences (mentioned here). So you can just do inter-process communication the multiprocessing-way, an example is given in the Python docs, but using multiprocess.Pipe() instead of multiprocessing.Pipe().

    Applying this to my example code:

    #!/usr/bin/env python3
    
    import pathos
    
    import multiprocess as mp
    
    
    WAIT_TIMEOUT_SECS = 1.0
    
    
    def worker_func(worker_no, conn, limit):
        """
        Dummy task: Count all prime numbers smaller than than "limit".
        """
        count = 0
        for num in range(2, limit):
            if all(num % i != 0 for i in range(2, num)):
                count += 1
    
            # Report progress to main process now and then
            if num % (limit//10) == 0:
                percent_done = int(num/limit*100)
                conn.send([worker_no, percent_done, count, num])
        return limit, count
    
    
    pool = pathos.pools.ProcessPool()
    
    # Make workers
    workers, parent_pipes = [], []
    for worker_no, limit in enumerate([50000, 80000, 100000]):
        parent_conn, child_conn = mp.Pipe()
        workers.append(pool.apipe(
            worker_func,
            worker_no,
            child_conn,
            limit
        ))
        parent_pipes.append(parent_conn)
    
    # Watch workers
    while len(workers) > 0:
        # Collect done workers
        done_workers_indices = []
        for worker_no, worker in enumerate(workers):
            if worker is not None and worker.ready():
                done_workers_indices.append(worker_no)
    
        # Handle done workers
        for worker_no in done_workers_indices:
            worker = workers[worker_no]
            limit, count = worker.get()
            print("Worker {} has completed: There are {} primes smaller than {}." \
                .format(worker_no, count, limit))
            del workers[worker_no]
            del parent_pipes[worker_no]
    
        # Read progress reportings
        conns_ready = mp.connection.wait(
            parent_pipes,
            WAIT_TIMEOUT_SECS
        )
        for conn in conns_ready:
            try:
                worker_no, percent_done, count, num = conn.recv()
                print("Worker {} is {}% done: There are {} primes smaller than {}." \
                    .format(worker_no, percent_done, count, num))
            except EOFError:
                # There's nothing left to receive and the other end was closed.
                pass