pythontqdm

One progress bar for a parallel job python


The loop runs over some number of models (n_mod) and is distributed among the n_cpus. As you will note, running this code as mpirun -np 4 python test_mpi.py produces 4 progress bars. This is understandable. But is there a way to use tdqm to get one progress bar which tells me how many models have been completed?

from tqdm import tqdm
from mpi4py import MPI
import time

comm = MPI.COMM_WORLD
cpu_ind = comm.Get_rank()
n_cpu = comm.Get_size()

n_mod=100
for i in tqdm(range(n_mod)):
    if (cpu_ind == int(i/int(n_mod/n_cpu))%n_cpu):
        #some task here which is a function of i
        time.sleep(0.02)

Solution

  • This can be achieved with a main node and worker node(s) setup.

    Essentially only the rank == 0 node will be updating a progress bar whilst the worker nodes will simply be informing the main node that they have completed the task.

    Worker:

    (Defined similarly to your above code)

    def worker(n_mod, size, rank):
        comm = MPI.COMM_WORLD
        step = size - 1  # Number of worker processes
        start = rank - 1  
    
        for i in range(start, n_mod, step):
            time.sleep(0.02)  # Work
            # Notify the master that a task is done
            comm.send('done', dest=0, tag=1)
    

    Main Node:

    def pbar_node(n_mod, size):
    
        comm = MPI.COMM_WORLD
        pbar = tqdm(total=n_mod, desc="Processing Models")
        completed = 0
    
        while completed < n_mod:
            # Receive a message from any worker
            status = MPI.Status()
            comm.recv(source=MPI.ANY_SOURCE, tag=1, status=status)
            completed += 1
            pbar.update(1)
    
        pbar.close()
    

    I have tested this with mpirun -np 4 python test.py and get a single progress bar for 4 models sharing 100 tasks as shown in the screenshot.

    enter image description here

    Ensure that you take into account the fact there will be the number of workers plus a main node when dealing with distributing tasks so you do not get off-by-one errors.

    You can use these like so:

    comm = MPI.COMM_WORLD
    rank = comm.Get_rank()
    size = comm.Get_size()
    
    n_mod = N_TASKS
    
    if rank == 0:
        # pbar process
        pbar_node(n_mod)else:
        # Worker processes
        worker(n_mod, size, rank)