pythonjupyterprogress-bardaskmulticore

Pyton multicore processing with Dask progress bar not showing


I have a database table 'Sensor_Data' with sensor data form different locations having columns, id, timestamp, value. Each id corresponds to a sensor in a particular location.

I am trying to use multicore processing using dash to compute results for two different algorithms and save the results in parquet files for each sensor.

The code below runs perfects on multiple cores however I can't seem to get the progress bar to display in jupyter notebook.

I have omitted the get_data, compute_algo1 and compute_algo2 functions as they just load data and perform simple calculations on df rows.

import pandas as pd
import os
import pandas as pd
import dask
from datetime import datetime
from dask.distributed import Client, LocalCluster
from dask import delayed, bag
from dask import dataframe as dd
from tqdm.auto import tqdm
from dask.diagnostics import ProgressBar

id_list = [1,2,3,4,5 ..... 10000]
out_folder = 'output' +os.path.sep

def compute_parallel_dask(pid,out_folder):
    df_data = get_data(pid)
    fname = out_folder+f'{pid}.parquet'
    if not os.path.exists(fname):
         df_computed = pd.DataFrame(columns=['DateTime','raw_value','algo1_value','algo2_value']
         for indx, row in df_data.iterrows(): 
               raw_value = row.value
               algo1_val = compute_algo1(row)
               algo2_val = compute_algo2(row)
               df_computed.loc[len(df_computed)] = [pd.to_datetime(indx),raw_value,algo1_val,algo2_val]
         df_computed.to_parquet(fname, index=False)
    else:
        df_computed = pd.read_parquet(fname)
    return dd.from_pandas(df_computed, npartitions=1)

results = []
num_cores = 6

folder_list = [out_folder for pid in id_list]

# Use dask.bag to parallelize the function calls with multiple arguments
b = bag.from_sequence(zip(id_list, folder_list)).map(lambda x: compute_parallel_dask(*x))

# Set up the local Dask cluster and client
cluster = LocalCluster(n_workers=num_cores, scheduler_port=0)
client = Client(cluster)
# Register dask.diagnostics.ProgressBar to display the progress bar
dask.diagnostics.ProgressBar().register()

# Use tqdm to manually display the progress bar
for result in tqdm(b.compute(), desc="Processing"):
    results.append(result)
# Since the computations are now executed, the Parquet files should have been written
print('All Done')

Any help in fixing this would be greatly appreciated. Thanks


Solution

  • See https://docs.dask.org/en/stable/diagnostics-distributed.html#progress-bar:

    The dask.distributed progress bar differs from the ProgressBar used for local diagnostics. The progress function takes a Dask object that is executing in the background

    Since you are using Dask Distributed Scheduler, you should use the method provided in the link.