Hej all,
I am struggling to limit the number of parallel executions in the below Python code using multiprocessing - in particular Queue(), Manager() and Pool().
My understand was that multiprocessing.Pool(processes=2) would result in two workers running in parallel while other queued workers are "on hold" until one of the two running jobs finished.
No matter what I tried, all the workers are processed in parallel in the below example. To make myself and my expectations clear, lets assume the following (default for the script below):
But the actual runtime of the script below is around 10 seconds - which means all the 10 workers were executed in parallel despite Pool() processes were set to 2.
Any hints on what I am doing wrong below?
#!/usr/bin/env python3
#
#
'''
Script to test how I can limit the number of parallel executions
'''
import sys
import time
#import random
import argparse
from multiprocessing import Process, Pool, Manager, cpu_count
parser = argparse.ArgumentParser(
prog='mp-test.py',
description='Try to limit number of parallel processes'
)
parser.add_argument('-nh', '--num-hosts', dest="num_hosts", type=int, default=10,
help='Number of hosts to ssh to (default 10).')
parser.add_argument('-np', '--num-proc', dest="num_proc", type=int, default=2,
help=f'Number of processes to spawn (default 2).')
parser.add_argument('-st', '--sleep-time', dest="sleep_time", type=int, default=10,
help='Time of imulated work in seconds (default 10).')
args = parser.parse_args()
num_p = args.num_proc
num_h = args.num_hosts
num_s = args.sleep_time
def worker(index, queue):
'''
Do some fake work
'''
print(f'SSH to host {index} and grab some data')
tw_start = time.time()
#fake_work_time = num_s + random.randrange(0, 10, 1)
fake_work_time = num_s
time.sleep(fake_work_time)
tw_stop = time.time()
tw_elapsed = tw_stop - tw_start
result = [index + 1000, tw_elapsed]
print(f'Result from host {index} is {result[0]} and was returned after {tw_elapsed} seconds')
queue.put(result)
def reader(proc_q):
'''
Put the resuts of the fake work into Queue()
'''
message = proc_q.get()
return message
def run_par():
'''
Run "worker" in parallel but limit the number of parallel executions
to not overload the host running mp-test.py
'''
objs = {}
procs = []
mngr = Manager()
proc_q = mngr.Queue()
proc_p = Pool(processes=num_p)
try:
for host in range(num_h):
proc = Process(target=worker, args=(host, proc_q))
procs.append(proc)
proc.start()
readers = []
for proc in procs:
readers.append(proc_p.apply_async(reader, (proc_q,)))
proc_p.close()
proc_p.join()
for enum, rdr in enumerate(readers):
ret = rdr.get() # blocking
objs.update({enum: ret})
except Exception as exn:
print("Something went wrong, doing something else instead")
print(exn)
sys.exit()
return objs
if __name__ == "__main__":
print(f'Parallel processes: {num_p}')
print(f'Hosts to query: {num_h}')
print(f'Fake work time (s): {num_s}')
if num_h < num_p:
est_runtime = round(num_s * num_h / num_h , 1)
else:
est_runtime = round(num_s * num_h / num_p , 1)
print(f'Expected runtime (s): {est_runtime}\n')
tm_start = time.time()
results = run_par()
tm_stop = time.time()
tm_elapsed = tm_stop - tm_start
print("\nResults:\n", results)
print(f'\nActual runtime: {tm_elapsed}')
Thanks a lot for any type of useful feedback :)
After implementing the solution as suggestd by @AKX into my actual code (glljobstat.py) here is an example of the output and the runtime with increasing processes:
1 process:
# time ./glljobstat.py -c 3 -n 1 -np 1
SSH time : 10.126535177230835
Parser time : 2.58267879486084
---
timestamp: 1693775799
servers_queried: 8
osts_queried: 24
mdts_queried: 8
total_jobs: 2129
top_3_jobs:
- 4601677@92097@comp0338: {ops: 798843135, op: 96522303, cl: 294159636, mn: 26049471, ul: 26049251, mk: 15, mv: 11979331, ga: 122437650, sa: 47444116, gx: 47174314, sx: 34, sy: 8308883, rd: 10056273, wr: 18854594, pu: 89807264}
- 4662864@92541@comp0602: {ops: 796758014, op: 11423, cl: 793269924, mn: 1373, ul: 29, mk: 5, rm: 5, mv: 1344, ga: 3120625, sa: 8006, gx: 7673, sy: 38838, rd: 296031, wr: 1653, pu: 1085}
- 4601786@92097@comp0338: {ops: 700962859, op: 84562383, cl: 256040408, mn: 22793979, ul: 22793371, mk: 15, mv: 10434189, ga: 108451480, sa: 41365212, gx: 42202203, sx: 34, sy: 7305240, rd: 9320137, wr: 17080545, pu: 78613663}
...
real 0m15.757s
user 0m6.796s
sys 0m0.630s
2 processes:
# time ./glljobstat.py -c 3 -n 1 -np 2
SSH time : 5.219966888427734
Parser time : 1.4718618392944336
---
timestamp: 1693775820
servers_queried: 8
osts_queried: 24
mdts_queried: 8
total_jobs: 2136
top_3_jobs:
- 4601677@92097@comp0338: {ops: 798852530, op: 96523769, cl: 294162512, mn: 26050049, ul: 26049393, mk: 15, mv: 11979443, ga: 122439542, sa: 47444536, gx: 47175247, sx: 34, sy: 8309110, rd: 10056273, wr: 18854648, pu: 89807959}
- 4662864@92541@comp0602: {ops: 796783205, op: 11423, cl: 793294979, mn: 1373, ul: 29, mk: 5, rm: 5, mv: 1344, ga: 3120761, sa: 8006, gx: 7673, sy: 38838, rd: 296031, wr: 1653, pu: 1085}
- 4601786@92097@comp0338: {ops: 700976193, op: 84564172, cl: 256044680, mn: 22794240, ul: 22793650, mk: 15, mv: 10434359, ga: 108454243, sa: 41365909, gx: 42203314, sx: 34, sy: 7305240, rd: 9320137, wr: 17080638, pu: 78615562}
...
real 0m8.727s
user 0m6.981s
sys 0m0.626s
10 processes:
# time ./glljobstat.py -c 3 -n 1 -np 10
SSH time : 1.3297054767608643
Parser time : 0.8704047203063965
---
timestamp: 1693775833
servers_queried: 8
osts_queried: 24
mdts_queried: 8
total_jobs: 2139
top_3_jobs:
- 4601677@92097@comp0338: {ops: 798861261, op: 96525096, cl: 294165479, mn: 26050201, ul: 26049545, mk: 15, mv: 11979595, ga: 122441619, sa: 47445080, gx: 47176101, sx: 34, sy: 8309110, rd: 10056273, wr: 18854671, pu: 89808442}
- 4662864@92541@comp0602: {ops: 796800206, op: 11423, cl: 793311876, mn: 1373, ul: 29, mk: 5, rm: 5, mv: 1344, ga: 3120865, sa: 8006, gx: 7673, sy: 38838, rd: 296031, wr: 1653, pu: 1085}
- 4601786@92097@comp0338: {ops: 700977232, op: 84564273, cl: 256044980, mn: 22794254, ul: 22793774, mk: 15, mv: 10434359, ga: 108454492, sa: 41365932, gx: 42203354, sx: 34, sy: 7305240, rd: 9320137, wr: 17080669, pu: 78615719}
...
real 0m3.254s
user 0m7.537s
sys 0m0.994s
20 processes (max cores available):
# time ./glljobstat.py -c 3 -n 1 -np 20
SSH time : 0.846405029296875
Parser time : 0.965083122253418
---
timestamp: 1693775843
servers_queried: 8
osts_queried: 24
mdts_queried: 8
total_jobs: 2139
top_3_jobs:
- 4601677@92097@comp0338: {ops: 798870129, op: 96526382, cl: 294168352, mn: 26050351, ul: 26049695, mk: 15, mv: 11979745, ga: 122443623, sa: 47445609, gx: 47176932, sx: 34, sy: 8309110, rd: 10056273, wr: 18855044, pu: 89808964}
- 4662864@92541@comp0602: {ops: 796814989, op: 11423, cl: 793326576, mn: 1373, ul: 29, mk: 5, rm: 5, mv: 1344, ga: 3120948, sa: 8006, gx: 7673, sy: 38838, rd: 296031, wr: 1653, pu: 1085}
- 4601786@92097@comp0338: {ops: 700982754, op: 84564886, cl: 256046406, mn: 22794581, ul: 22794165, mk: 15, mv: 10434375, ga: 108455502, sa: 41366123, gx: 42203768, sx: 34, sy: 7305614, rd: 9320140, wr: 17080690, pu: 78616455}
...
real 0m2.892s
user 0m8.016s
sys 0m1.344s
If you're using a Pool
, you shouldn't create Process
es by hand.
The parallel work in your program simplifies to
def run_par():
work = range(num_h)
with multiprocessing.Pool(processes=num_p) as p:
return dict(p.imap_unordered(worker, work))
e.g.
import multiprocessing
import time
num_p = 2
num_h = 10
num_s = 1
def worker(index):
"""
Do some fake work
"""
print(f"SSH to host {index} and grab some data")
tw_start = time.time()
fake_work_time = num_s
time.sleep(fake_work_time)
tw_stop = time.time()
tw_elapsed = tw_stop - tw_start
result = [index + 1000, tw_elapsed]
print(f"Result from host {index} is {result[0]} and was returned after {tw_elapsed} seconds")
return (index, result)
def run_par():
work = range(num_h)
with multiprocessing.Pool(processes=num_p) as p:
return dict(p.imap_unordered(worker, work))
if __name__ == "__main__":
print(f"Parallel processes: {num_p}")
print(f"Hosts to query: {num_h}")
print(f"Fake work time (s): {num_s}")
if num_h < num_p:
est_runtime = round(num_s * num_h / num_h, 1)
else:
est_runtime = round(num_s * num_h / num_p, 1)
print(f"Expected runtime (s): {est_runtime}\n")
tm_start = time.time()
results = run_par()
tm_stop = time.time()
tm_elapsed = tm_stop - tm_start
print("\nResults:\n", results)
print(f"\nActual runtime: {tm_elapsed}")
prints out
Parallel processes: 2
Hosts to query: 10
Fake work time (s): 1
Expected runtime (s): 5.0
SSH to host 0 and grab some data
SSH to host 1 and grab some data
Result from host 0 is 1000 and was returned after 1.0009031295776367 seconds
SSH to host 2 and grab some data
Result from host 1 is 1001 and was returned after 1.0009031295776367 seconds
SSH to host 3 and grab some data
Result from host 2 is 1002 and was returned after 1.0006415843963623 seconds
SSH to host 4 and grab some data
Result from host 3 is 1003 and was returned after 1.0006437301635742 seconds
SSH to host 5 and grab some data
Result from host 4 is 1004 and was returned after 1.0007638931274414 seconds
SSH to host 6 and grab some data
Result from host 5 is 1005 and was returned after 1.0007636547088623 seconds
SSH to host 7 and grab some data
Result from host 6 is 1006 and was returned after 1.0010120868682861 seconds
SSH to host 8 and grab some data
Result from host 7 is 1007 and was returned after 1.0010101795196533 seconds
SSH to host 9 and grab some data
Result from host 8 is 1008 and was returned after 1.0006747245788574 seconds
Result from host 9 is 1009 and was returned after 1.0006749629974365 seconds
Results:
{0: [1000, 1.0009031295776367], 1: [1001, 1.0009031295776367], 2: [1002, 1.0006415843963623], 3: [1003, 1.0006437301635742], 4: [1004, 1.0007638931274414], 5: [1005, 1.0007636547088623], 6: [1006, 1.0010120868682861], 7: [1007, 1.0010101795196533], 8: [1008, 1.0006747245788574], 9: [1009, 1.0006749629974365]}
Actual runtime: 5.106996536254883