0

I have the following structure on my code using Dask:

@dask.delayed
def calculate(data):
    services = data.service_id
    prices = data.price
    
    return [services, prices]

output = []

for qid in notebook.tqdm(ids):
    r = calculate(parts[parts.quotation_id == qid])
    output.append(r)

Turns out that, when I call the dask.compute() method over my output list, I don't have any progress indication. The Diagnostic UI don't "capture" this action, and I'm not even sure that's properly running (judging by my processor usage, I think it's not).

result = dask.compute(*output)

I'm following the "best practices" article from the dask's documentation:

https://docs.dask.org/en/latest/delayed-best-practices.html

What I'm missing?

Edit: I think it's running, because I still got memory leak/high usage warnings. Still no progress indication.

Jorge Nachtigall
  • 501
  • 4
  • 20
  • did you see this post: https://stackoverflow.com/questions/49039750/how-to-see-progress-of-dask-compute-task – Val Jan 15 '21 at 08:43
  • @Val yup! The accepted answer even says that you can see the progress on the dashboard (which doesn't happen). Note that this problem only occurs when I use the syntax in the example that I provided (it's not incorrectly, neither is a bad pratice). Thanks. – Jorge Nachtigall Jan 15 '21 at 12:55
  • So you're using "local" dask (aka not distributed) in a Jupyter Notebook? **Edit**: Why do you use `notebook.tqdm` ? – Val Jan 15 '21 at 13:07
  • @Val Yes, i'm running local in jupyter notebook. Did you entered the documentation link that I've posted on the question? ```notebook.tqdm``` is just to have some info about the loop, it doesn't change anything. – Jorge Nachtigall Jan 15 '21 at 16:15
  • Yes I'm familiar. I've posted an answer that creates progress bars for both local and distributed dask tasks. I've had to create a reproducible example, so I hope that matches with you actual workflow – Val Jan 15 '21 at 16:23

1 Answers1

2

As pointed out in the related post, dask has two methods for displaying the progress: one for "normal" dask, and one for dask.distributed.

Here's a reproducible example:

import random 
from time import sleep

import dask
from dask.diagnostics import ProgressBar
from dask.distributed import Client, progress

# simulate work
@dask.delayed
def work(x):

    sleep(x)
    return True
    

# generate tasks

random.seed(42)
tasks = [work(random.randint(1,5)) for x in range(50)]

Using plain dask

ProgressBar().register()
dask.compute(*tasks)

produces:

enter image description here

using dask.distributed

client = Client()
futures = client.compute(tasks)

progress(futures)

produces:

enter image description here

Val
  • 6,585
  • 5
  • 22
  • 52