daskdask-distributeddask-delayeddask-dataframe

Display progress on dask.compute(*something) call


I have the following structure on my code using Dask:

@dask.delayed
def calculate(data):
    services = data.service_id
    prices = data.price
    
    return [services, prices]

output = []

for qid in notebook.tqdm(ids):
    r = calculate(parts[parts.quotation_id == qid])
    output.append(r)

Turns out that, when I call the dask.compute() method over my output list, I don't have any progress indication. The Diagnostic UI don't "capture" this action, and I'm not even sure that's properly running (judging by my processor usage, I think it's not).

result = dask.compute(*output)

I'm following the "best practices" article from the dask's documentation:

https://docs.dask.org/en/latest/delayed-best-practices.html

What I'm missing?

Edit: I think it's running, because I still got memory leak/high usage warnings. Still no progress indication.


Solution

  • As pointed out in the related post, dask has two methods for displaying the progress: one for "normal" dask, and one for dask.distributed.

    Here's a reproducible example:

    import random 
    from time import sleep
    
    import dask
    from dask.diagnostics import ProgressBar
    from dask.distributed import Client, progress
    
    # simulate work
    @dask.delayed
    def work(x):
    
        sleep(x)
        return True
        
    
    # generate tasks
    
    random.seed(42)
    tasks = [work(random.randint(1,5)) for x in range(50)]
    

    Using plain dask

    ProgressBar().register()
    dask.compute(*tasks)
    

    produces:

    enter image description here

    using dask.distributed

    client = Client()
    futures = client.compute(tasks)
    
    progress(futures)
    

    produces:

    enter image description here