I'm using the pathos
framework to do tasks concurrently, in different processes. Under the hood, this is done with ppft
, which is part of pathos
. My current approach uses a pathos.multiprocessing.ProcessPool
instance. Multiple jobs are submitted to it, one at a time without blocking, using apipe()
. The main process is then supervising the worker processes using ready()
and get()
.
This is working fine, but results of the worker processes are only received after they finish (the process has ended). However, I need a way to get intermediate results from the worker process. I can't find anything clear about this in the docs of pathos
/ppft
, but from hints they contain there and here, it seems clear that this should be possible with their features. How do you do inter-process communication with pathos
/ppft
in combination with a ProcessPool
?
The following demo code illustrates my approach. How could I send intermediate results to the main process?. For example, report the list of primes found so far, every time its length is a multiple of 100?
#!/usr/bin/env python3
import pathos
def worker_func(limit):
"""
Dummy task: Find list of all prime numbers smaller than than "limit".
"""
return limit, [
num for num in range(2, limit) \
if all(num % i != 0 for i in range(2, num))
]
pool = pathos.pools.ProcessPool()
jobs = []
jobs.append(pool.apipe(worker_func, 10000))
jobs.append(pool.apipe(worker_func, 15000))
jobs.append(pool.apipe(worker_func, 20000))
count_done_jobs = 0
while count_done_jobs < len(jobs):
for job_idx, job in enumerate(jobs):
if job is not None and job.ready():
limit, primes = job.get()
jobs[job_idx] = None
count_done_jobs += 1
print("Job {}: There are {} primes smaller than {}." \
.format(job_idx, len(primes), limit))
Okay, I figured it out. I realized that pathos
is using multiprocess
for the ProcessPool
, and multiprocess
is a fork of the official Python module multiprocessing
with only minor differences (mentioned here). So you can just do inter-process communication the multiprocessing
-way, an example is given in the Python docs, but using multiprocess.Pipe()
instead of multiprocessing.Pipe()
.
Applying this to my example code:
#!/usr/bin/env python3
import pathos
import multiprocess as mp
WAIT_TIMEOUT_SECS = 1.0
def worker_func(worker_no, conn, limit):
"""
Dummy task: Count all prime numbers smaller than than "limit".
"""
count = 0
for num in range(2, limit):
if all(num % i != 0 for i in range(2, num)):
count += 1
# Report progress to main process now and then
if num % (limit//10) == 0:
percent_done = int(num/limit*100)
conn.send([worker_no, percent_done, count, num])
return limit, count
pool = pathos.pools.ProcessPool()
# Make workers
workers, parent_pipes = [], []
for worker_no, limit in enumerate([50000, 80000, 100000]):
parent_conn, child_conn = mp.Pipe()
workers.append(pool.apipe(
worker_func,
worker_no,
child_conn,
limit
))
parent_pipes.append(parent_conn)
# Watch workers
while len(workers) > 0:
# Collect done workers
done_workers_indices = []
for worker_no, worker in enumerate(workers):
if worker is not None and worker.ready():
done_workers_indices.append(worker_no)
# Handle done workers
for worker_no in done_workers_indices:
worker = workers[worker_no]
limit, count = worker.get()
print("Worker {} has completed: There are {} primes smaller than {}." \
.format(worker_no, count, limit))
del workers[worker_no]
del parent_pipes[worker_no]
# Read progress reportings
conns_ready = mp.connection.wait(
parent_pipes,
WAIT_TIMEOUT_SECS
)
for conn in conns_ready:
try:
worker_no, percent_done, count, num = conn.recv()
print("Worker {} is {}% done: There are {} primes smaller than {}." \
.format(worker_no, percent_done, count, num))
except EOFError:
# There's nothing left to receive and the other end was closed.
pass