pythonjoblibtqdm

Running functions in parallel and seeing their progress


I am using joblib to run four processes on four cores in parallel. I would like to see the progress of the four processes separately on different lines. However, what I see is the progress being written on top of each other to the same line until the first process finishes.

from math import factorial
from decimal import Decimal, getcontext
from joblib import Parallel, delayed
from tqdm import trange
import time

def calc(n_digits):
    # number of iterations
    n = int(n_digits+1/14.181647462725477)
    n = n if n >= 1 else 1

    # set the number of digits for our numbers
    getcontext().prec = n_digits+1

    t    = Decimal(0)
    pi   = Decimal(0)
    deno = Decimal(0)

    for k in trange(n):
        t = ((-1)**k)*(factorial(6*k))*(13591409+545140134*k)
        deno = factorial(3*k)*(factorial(k)**3)*(640320**(3*k))
        pi += Decimal(t)/Decimal(deno)

    pi = pi * Decimal(12) / Decimal(640320 ** Decimal(1.5))
    pi = 1/pi
    
    # no need to round
    return pi


def parallel_with_joblib():
    # Define the number of cores to use
    n_cores = 4

    # Define the tasks (e.g., compute first 100, 200, 300, 400 digits of pi)
    tasks = [1200, 1700, 900, 1400]


    # Run tasks in parallel
    results = Parallel(n_jobs=n_cores)(delayed(calc)(n) for n in tasks)


if __name__ == "__main__":
    parallel_with_joblib()

I would also like the four lines to be labelled "Job 1 of 4", "Job 2 of 4" etc.


Following the method of @Swifty and changing the number of cores to 3 and the number of tasks to 7 and changing leave=False to leave=True I have this code:

from math import factorial
from decimal import Decimal, getcontext
from joblib import Parallel, delayed
from tqdm import trange
import time


def calc(n_digits, pos, total):
    # number of iterations
    n = int(n_digits + 1 / 14.181647462725477)
    n = n if n >= 1 else 1

    # set the number of digits for our numbers
    getcontext().prec = n_digits + 1

    t = Decimal(0)
    pi = Decimal(0)
    deno = Decimal(0)

    for k in trange(n, position=pos, desc=f"Job {pos + 1} of {total}", leave=True):
        t = ((-1) ** k) * (factorial(6 * k)) * (13591409 + 545140134 * k)
        deno = factorial(3 * k) * (factorial(k) ** 3) * (640320 ** (3 * k))
        pi += Decimal(t) / Decimal(deno)

    pi = pi * Decimal(12) / Decimal(640320 ** Decimal(1.5))
    pi = 1 / pi

    # no need to round
    return pi


def parallel_with_joblib():
    # Define the number of cores to use
    n_cores = 3

    # Define the tasks (e.g., compute first 100, 200, 300, 400 digits of pi)
    tasks =  [1200, 1700, 900, 1400, 800, 600, 500]

    # Run tasks in parallel
    results = Parallel(n_jobs=n_cores)(delayed(calc)(n, pos, len(tasks)) for (pos, n) in enumerate(tasks))


if __name__ == "__main__":
    parallel_with_joblib()

I have change it to leave=True as I don't want the blank lines that appear otherwise.

This however gives me:

enter image description here

and then at the end it creates even more mess:

enter image description here

How can this be fixed?


Solution

  • My idea was to create all the task bars in the main process and to create a single multiprocessing queue that each pool process would have access to. Then when calc completed an iteration it would place on the queue an integer representing its corresponding task bar. The main process would continue to get these integers from the queue and update the correct task bar. Each calc instance would place a sentinel value on the queue telling the main process that it had no more updates to enqueue.

    With a multiprocessing.pool.Pool instance we can use a "pool initializer" function to initialize a global variable queue in each pool process, which will be accessed by calc. Unfortunately, joblib provides no authorized equivalent pool initializer. I tried various workarounds mentioned on the web, but none worked. So if you can live with not using joblib, then try this:

    from math import factorial
    from decimal import Decimal, getcontext
    from multiprocessing import Pool, Queue
    from tqdm import tqdm
    import time
    
    def init_pool(_queue):
        global queue
    
        queue = _queue
    
    def calc(n_digits, pos):
        # number of iterations
        n = int(n_digits + 1 / 14.181647462725477)
        n = n if n >= 1 else 1
    
        # set the number of digits for our numbers
        getcontext().prec = n_digits + 1
    
        t = Decimal(0)
        pi = Decimal(0)
        deno = Decimal(0)
    
        for k in range(n):
            t = ((-1) ** k) * (factorial(6 * k)) * (13591409 + 545140134 * k)
            deno = factorial(3 * k) * (factorial(k) ** 3) * (640320 ** (3 * k))
            pi += Decimal(t) / Decimal(deno)
            # Tell the main process to update the appropriate bar:
            queue.put(pos)
    
        pi = pi * Decimal(12) / Decimal(640320 ** Decimal(1.5))
        pi = 1 / pi
    
        # no need to round
        queue.put(None)  # Let updater know we have no more updates
        return pi
    
    def parallel_with_pool():
        # Define the number of cores to use
        n_cores = 4
    
        # Define the tasks (e.g., compute first 100, 200, 300, 400 digits of pi)
        tasks =  [1200, 1700, 900, 1400] # Edit to make code for longer
        n_tasks = len(tasks)
    
        queue = Queue()
    
        LEAVE_PROGRESS_BAR = False
    
        # Create the bars:
        pbars = [
            tqdm(total=tasks[idx],
                 position=idx,
                 desc=f"Job {idx + 1} of {n_tasks}",
                 leave=LEAVE_PROGRESS_BAR
                 )
            for idx in range(n_tasks)
        ]
    
        # Run tasks in parallel
        with Pool(n_cores, initializer=init_pool, initargs=(queue,)) as pool:
            # This doesn't block and allows us to retrieve items from the queue:
            async_result = pool.starmap_async(calc, zip(tasks, range(n_tasks)))
    
            n = n_tasks
            while n:
                pos = queue.get()
                # Is this a sentinel value?
                if pos is None:
                    n -= 1  # One less task to await
                else:
                    pbars[pos].update()
    
            # We have no more updates to perform, so wait for the results:
            results = async_result.get()
    
            # Cause the bars to be removed before we display results
            # (See following Notes):
            for pbar in pbars:
                pbar.close()
            # So that the next print call starts at the start of the line
            # (required if leave=False is specified):
            if not LEAVE_PROGRESS_BAR:
                print('\r')
    
            for result in results:
                print(result)
    
    if __name__ == "__main__":
        parallel_with_pool()
    

    Notes

    In the above code the progress bars are instantiated with the argument leave=False signifying that we do not want the bars to remain. Consider the following code:

    from tqdm import tqdm
    import time
    
    with tqdm(total=10, leave=False) as pbar:
        for _ in range(10):
            pbar.update()
            time.sleep(.5)
    
    print('Done!')
    

    When the with block is terminated, the progress bar will disappear as a result of the implicit call to pbar.__exit__ that occurs. But if we had instead:

    pbar = tqdm(total=10, leave=False)
    for _ in range(10):
        pbar.update()
        time.sleep(.5)
    
    print('Done')
    

    We would see instead:

    C:\Ron\test>test.py
    100%|██████████████████████| 10/10 [00:04<00:00,  2.03it/s]Done
    

    Since, in the posted answer we are not using the progress bar as context manager the progress bar are not immediately erased and if we had a print statement to output the actual results of our PI calculations, we would have the problem. The solution is to explicitly call close() on each progress bar:

    ...
    def parallel_with_pool():
            ...
    
            # We have no more updates to perform, so wait for the results:
            results = async_result.get()
    
            # Cause the bars to be removed before we display results.
            for pbar in pbars:
                pbar.close()
                # So that the next print call starts at the start of the line
                # (required if leave=False is specified):
                print('\r')
    
            for result in results:
                print(result)
    

    If you want the progress bars to remain even after they have completed, then specify leave=True as follows:

        pbars = [
            tqdm(total=tasks[idx],
                 position=idx,
                 desc=f"Job {idx + 1} of {n_tasks}",
                 leave=True
                 )
            for idx in range(n_tasks)
        ]
    

    It is no longer necessary to call close for each bar, but it does not hurt to do so.

    Update

    Instead of using a multiprocessing.Queue instance to communicate we can instead create a multiprocessing.Array instance (which uses shared memory) of N counters where N is the number of progress bars whose progress is being tracked. Every iteration of calc will include an increment of the appropriate counter. The main process now has to periodically (say every .1 seconds) check the counters and update the progress bar accordingly:

    from math import factorial
    from decimal import Decimal, getcontext
    from multiprocessing import Pool, Array
    from tqdm import tqdm
    import time
    
    def init_pool(_progress_cntrs):
        global progress_cntrs
    
        progress_cntrs = _progress_cntrs
    
    def calc(n_digits, pos):
        # number of iterations
        n = int(n_digits + 1 / 14.181647462725477)
        n = n if n >= 1 else 1
    
        # set the number of digits for our numbers
        getcontext().prec = n_digits + 1
    
        t = Decimal(0)
        pi = Decimal(0)
        deno = Decimal(0)
    
        for k in range(n):
            t = ((-1) ** k) * (factorial(6 * k)) * (13591409 + 545140134 * k)
            deno = factorial(3 * k) * (factorial(k) ** 3) * (640320 ** (3 * k))
            pi += Decimal(t) / Decimal(deno)
            progress_cntrs[pos] += 1
    
        pi = pi * Decimal(12) / Decimal(640320 ** Decimal(1.5))
        pi = 1 / pi
    
        return pi
    
    def parallel_with_pool():
        # Define the number of cores to use
        n_cores = 4
    
        # Define the tasks (e.g., compute first 100, 200, 300, 400 digits of pi)
        tasks =  [1200, 1700, 900, 1400] # Edit to make code for longer
        n_tasks = len(tasks)
        progress_cntrs = Array('i', [0] * n_tasks, lock=False)
    
        LEAVE_PROGRESS_BAR = True
    
        # Create the bars:
        pbars = [
            tqdm(total=tasks[idx],
                 position=idx,
                 desc=f"Job {idx + 1} of {n_tasks}",
                 leave=LEAVE_PROGRESS_BAR
                 )
            for idx in range(n_tasks)
        ]
    
        # Run tasks in parallel
        with Pool(n_cores, initializer=init_pool, initargs=(progress_cntrs,)) as pool:
            # This doesn't block and allows us to retrieve items form the queue:
            async_result = pool.starmap_async(calc, zip(tasks, range(n_tasks)))
    
            n = n_tasks
            while n:
                time.sleep(.1)
    
                for idx in range(n_tasks):
                    ctr = progress_cntrs[idx]
                    if ctr != -1:
                        # This bar isn't complete
                        pbars[idx].n = ctr
                        pbars[idx].refresh()
                        if ctr == tasks[idx]:
                            # This bar is now complete
                            progress_cntrs[idx] = -1 # So we do not process this bar again
                            n -= 1
    
            # We have no more updates to perform, so wait for the results:
            results = async_result.get()
    
            # Cause the bars to be removed before we display results
            # (See following Notes):
            for pbar in pbars:
                pbar.close()
            # So that the next print call starts at the start of the line
            # (required if leave=False is specified)
            if not LEAVE_PROGRESS_BAR:
                print('\r')
    
            for result in results:
                print(result)
    
    if __name__ == '__main__':
        parallel_with_pool()