I have the following structure on my code using Dask:
@dask.delayed
def calculate(data):
services = data.service_id
prices = data.price
return [services, prices]
output = []
for qid in notebook.tqdm(ids):
r = calculate(parts[parts.quotation_id == qid])
output.append(r)
Turns out that, when I call the dask.compute()
method over my output
list, I don't have any progress indication. The Diagnostic UI don't "capture" this action, and I'm not even sure that's properly running (judging by my processor usage, I think it's not).
result = dask.compute(*output)
I'm following the "best practices" article from the dask's documentation:
https://docs.dask.org/en/latest/delayed-best-practices.html
What I'm missing?
Edit: I think it's running, because I still got memory leak/high usage warnings. Still no progress indication.
As pointed out in the related post, dask
has two methods for displaying the progress: one for "normal" dask
, and one for dask.distributed
.
Here's a reproducible example:
import random
from time import sleep
import dask
from dask.diagnostics import ProgressBar
from dask.distributed import Client, progress
# simulate work
@dask.delayed
def work(x):
sleep(x)
return True
# generate tasks
random.seed(42)
tasks = [work(random.randint(1,5)) for x in range(50)]
dask
ProgressBar().register()
dask.compute(*tasks)
produces:
dask.distributed
client = Client()
futures = client.compute(tasks)
progress(futures)
produces: