I am using joblib to run four processes on four cores in parallel. I would like to see the progress of the four processes separately on different lines. However, what I see is the progress being written on top of each other to the same line until the first process finishes.
from math import factorial
from decimal import Decimal, getcontext
from joblib import Parallel, delayed
from tqdm import trange
import time
def calc(n_digits):
# number of iterations
n = int(n_digits+1/14.181647462725477)
n = n if n >= 1 else 1
# set the number of digits for our numbers
getcontext().prec = n_digits+1
t = Decimal(0)
pi = Decimal(0)
deno = Decimal(0)
for k in trange(n):
t = ((-1)**k)*(factorial(6*k))*(13591409+545140134*k)
deno = factorial(3*k)*(factorial(k)**3)*(640320**(3*k))
pi += Decimal(t)/Decimal(deno)
pi = pi * Decimal(12) / Decimal(640320 ** Decimal(1.5))
pi = 1/pi
# no need to round
return pi
def parallel_with_joblib():
# Define the number of cores to use
n_cores = 4
# Define the tasks (e.g., compute first 100, 200, 300, 400 digits of pi)
tasks = [1200, 1700, 900, 1400]
# Run tasks in parallel
results = Parallel(n_jobs=n_cores)(delayed(calc)(n) for n in tasks)
if __name__ == "__main__":
parallel_with_joblib()
I would also like the four lines to be labelled "Job 1 of 4", "Job 2 of 4" etc.
Following the method of @Swifty and changing the number of cores to 3 and the number of tasks to 7 and changing leave=False to leave=True I have this code:
from math import factorial
from decimal import Decimal, getcontext
from joblib import Parallel, delayed
from tqdm import trange
import time
def calc(n_digits, pos, total):
# number of iterations
n = int(n_digits + 1 / 14.181647462725477)
n = n if n >= 1 else 1
# set the number of digits for our numbers
getcontext().prec = n_digits + 1
t = Decimal(0)
pi = Decimal(0)
deno = Decimal(0)
for k in trange(n, position=pos, desc=f"Job {pos + 1} of {total}", leave=True):
t = ((-1) ** k) * (factorial(6 * k)) * (13591409 + 545140134 * k)
deno = factorial(3 * k) * (factorial(k) ** 3) * (640320 ** (3 * k))
pi += Decimal(t) / Decimal(deno)
pi = pi * Decimal(12) / Decimal(640320 ** Decimal(1.5))
pi = 1 / pi
# no need to round
return pi
def parallel_with_joblib():
# Define the number of cores to use
n_cores = 3
# Define the tasks (e.g., compute first 100, 200, 300, 400 digits of pi)
tasks = [1200, 1700, 900, 1400, 800, 600, 500]
# Run tasks in parallel
results = Parallel(n_jobs=n_cores)(delayed(calc)(n, pos, len(tasks)) for (pos, n) in enumerate(tasks))
if __name__ == "__main__":
parallel_with_joblib()
I have change it to leave=True as I don't want the blank lines that appear otherwise.
This however gives me:
and then at the end it creates even more mess:
How can this be fixed?
My idea was to create all the task bars in the main process and to create a single multiprocessing queue that each pool process would have access to. Then when calc
completed an iteration it would place on the queue an integer representing its corresponding task bar. The main process would continue to get these integers from the queue and update the correct task bar. Each calc
instance would place a sentinel value on the queue telling the main process that it had no more updates to enqueue.
With a multiprocessing.pool.Pool
instance we can use a "pool initializer" function to initialize a global variable queue
in each pool process, which will be accessed by calc
. Unfortunately, joblib
provides no authorized equivalent pool initializer. I tried various workarounds mentioned on the web, but none worked. So if you can live with not using joblib
, then try this:
from math import factorial
from decimal import Decimal, getcontext
from multiprocessing import Pool, Queue
from tqdm import tqdm
import time
def init_pool(_queue):
global queue
queue = _queue
def calc(n_digits, pos):
# number of iterations
n = int(n_digits + 1 / 14.181647462725477)
n = n if n >= 1 else 1
# set the number of digits for our numbers
getcontext().prec = n_digits + 1
t = Decimal(0)
pi = Decimal(0)
deno = Decimal(0)
for k in range(n):
t = ((-1) ** k) * (factorial(6 * k)) * (13591409 + 545140134 * k)
deno = factorial(3 * k) * (factorial(k) ** 3) * (640320 ** (3 * k))
pi += Decimal(t) / Decimal(deno)
# Tell the main process to update the appropriate bar:
queue.put(pos)
pi = pi * Decimal(12) / Decimal(640320 ** Decimal(1.5))
pi = 1 / pi
# no need to round
queue.put(None) # Let updater know we have no more updates
return pi
def parallel_with_pool():
# Define the number of cores to use
n_cores = 4
# Define the tasks (e.g., compute first 100, 200, 300, 400 digits of pi)
tasks = [1200, 1700, 900, 1400] # Edit to make code for longer
n_tasks = len(tasks)
queue = Queue()
LEAVE_PROGRESS_BAR = False
# Create the bars:
pbars = [
tqdm(total=tasks[idx],
position=idx,
desc=f"Job {idx + 1} of {n_tasks}",
leave=LEAVE_PROGRESS_BAR
)
for idx in range(n_tasks)
]
# Run tasks in parallel
with Pool(n_cores, initializer=init_pool, initargs=(queue,)) as pool:
# This doesn't block and allows us to retrieve items from the queue:
async_result = pool.starmap_async(calc, zip(tasks, range(n_tasks)))
n = n_tasks
while n:
pos = queue.get()
# Is this a sentinel value?
if pos is None:
n -= 1 # One less task to await
else:
pbars[pos].update()
# We have no more updates to perform, so wait for the results:
results = async_result.get()
# Cause the bars to be removed before we display results
# (See following Notes):
for pbar in pbars:
pbar.close()
# So that the next print call starts at the start of the line
# (required if leave=False is specified):
if not LEAVE_PROGRESS_BAR:
print('\r')
for result in results:
print(result)
if __name__ == "__main__":
parallel_with_pool()
Notes
In the above code the progress bars are instantiated with the argument leave=False signifying that we do not want the bars to remain. Consider the following code:
from tqdm import tqdm
import time
with tqdm(total=10, leave=False) as pbar:
for _ in range(10):
pbar.update()
time.sleep(.5)
print('Done!')
When the with
block is terminated, the progress bar will disappear as a result of the implicit call to pbar.__exit__
that occurs. But if we had instead:
pbar = tqdm(total=10, leave=False)
for _ in range(10):
pbar.update()
time.sleep(.5)
print('Done')
We would see instead:
C:\Ron\test>test.py
100%|██████████████████████| 10/10 [00:04<00:00, 2.03it/s]Done
Since, in the posted answer we are not using the progress bar as context manager the progress bar are not immediately erased and if we had a print statement to output the actual results of our PI calculations, we would have the problem. The solution is to explicitly call close()
on each progress bar:
...
def parallel_with_pool():
...
# We have no more updates to perform, so wait for the results:
results = async_result.get()
# Cause the bars to be removed before we display results.
for pbar in pbars:
pbar.close()
# So that the next print call starts at the start of the line
# (required if leave=False is specified):
print('\r')
for result in results:
print(result)
If you want the progress bars to remain even after they have completed, then specify leave=True as follows:
pbars = [
tqdm(total=tasks[idx],
position=idx,
desc=f"Job {idx + 1} of {n_tasks}",
leave=True
)
for idx in range(n_tasks)
]
It is no longer necessary to call close
for each bar, but it does not hurt to do so.
Update
Instead of using a multiprocessing.Queue
instance to communicate we can instead create a multiprocessing.Array
instance (which uses shared memory) of N counters where N is the number of progress bars whose progress is being tracked. Every iteration of calc
will include an increment of the appropriate counter. The main process now has to periodically (say every .1 seconds) check the counters and update the progress bar accordingly:
from math import factorial
from decimal import Decimal, getcontext
from multiprocessing import Pool, Array
from tqdm import tqdm
import time
def init_pool(_progress_cntrs):
global progress_cntrs
progress_cntrs = _progress_cntrs
def calc(n_digits, pos):
# number of iterations
n = int(n_digits + 1 / 14.181647462725477)
n = n if n >= 1 else 1
# set the number of digits for our numbers
getcontext().prec = n_digits + 1
t = Decimal(0)
pi = Decimal(0)
deno = Decimal(0)
for k in range(n):
t = ((-1) ** k) * (factorial(6 * k)) * (13591409 + 545140134 * k)
deno = factorial(3 * k) * (factorial(k) ** 3) * (640320 ** (3 * k))
pi += Decimal(t) / Decimal(deno)
progress_cntrs[pos] += 1
pi = pi * Decimal(12) / Decimal(640320 ** Decimal(1.5))
pi = 1 / pi
return pi
def parallel_with_pool():
# Define the number of cores to use
n_cores = 4
# Define the tasks (e.g., compute first 100, 200, 300, 400 digits of pi)
tasks = [1200, 1700, 900, 1400] # Edit to make code for longer
n_tasks = len(tasks)
progress_cntrs = Array('i', [0] * n_tasks, lock=False)
LEAVE_PROGRESS_BAR = True
# Create the bars:
pbars = [
tqdm(total=tasks[idx],
position=idx,
desc=f"Job {idx + 1} of {n_tasks}",
leave=LEAVE_PROGRESS_BAR
)
for idx in range(n_tasks)
]
# Run tasks in parallel
with Pool(n_cores, initializer=init_pool, initargs=(progress_cntrs,)) as pool:
# This doesn't block and allows us to retrieve items form the queue:
async_result = pool.starmap_async(calc, zip(tasks, range(n_tasks)))
n = n_tasks
while n:
time.sleep(.1)
for idx in range(n_tasks):
ctr = progress_cntrs[idx]
if ctr != -1:
# This bar isn't complete
pbars[idx].n = ctr
pbars[idx].refresh()
if ctr == tasks[idx]:
# This bar is now complete
progress_cntrs[idx] = -1 # So we do not process this bar again
n -= 1
# We have no more updates to perform, so wait for the results:
results = async_result.get()
# Cause the bars to be removed before we display results
# (See following Notes):
for pbar in pbars:
pbar.close()
# So that the next print call starts at the start of the line
# (required if leave=False is specified)
if not LEAVE_PROGRESS_BAR:
print('\r')
for result in results:
print(result)
if __name__ == '__main__':
parallel_with_pool()