1

I'm running a number of slow tasks on a Dask scheduler, and I want progress reports from each task. The tasks will be submitted from the same machine that will handle progress reports, so that could be kept in the same process, but for now let us just assume that tasks are submitted and progress reports handled in separate processes.

Dask provides Coordination Primitives whose intended use cases include being able to monitor progress:

These can be used to control access to external resources, track progress of ongoing computations, or share data in side-channels between many workers, clients, and tasks sensibly.

The simplest example I've been able to conjure that makes us of this is the following:

Task submitter:

from dask.distributed import Client, Pub
import time

c = Client('tcp://127.0.0.1:8786')

def slow_func():
    q = Pub('progress')
    for i in range(10):
        q.put(f'{i}/10')
        time.sleep(1)

c.submit(slow_func)

Task reporter:

from dask.distributed import Client, Sub

c = Client('tcp://127.0.0.1:8786')
q = Sub('progress')
while True:
    print(q.get())

This works with Pub/Sub but would also work equally well with a Queue. Now, even though it does work, it seems like it's not what the authors had in mind:

  • I end up implicitly relying on the same Client I've used to submit the task when doing the reporting; i.e. the Client ends up on the worker nodes. This feels strange.
  • I have no way of making distinctions between different tasks; ideally I'd be able to use something like the key of the future as part of the report.

So my, admittedly somewhat vague, question is: As far as creating a "Hello world" style example of a Dask future providing progress reports, how would I modify the above to something that could be considered idiomatic Dask, and are there any pitfalls to be aware of?

I can partially get around my first issue by creating a new client for each task (example below), but since I end up with something that appears to work just the same, perhaps doing so is unnecessary.

import time
from dask.distributed import Client, Pub

c_submit = Client('tcp://127.0.0.1:8786')

def slow_func():
    c_report = Client('tcp://127.0.0.1:8786')
    q = Pub('progress', client=c_report)
    for i in range(10):
        q.put(f'{i}/10')
        time.sleep(1)

c_submit.submit(slow_func)
fuglede
  • 17,388
  • 2
  • 54
  • 99
  • Have you seen this? https://stackoverflow.com/questions/49039750/how-to-see-progress-of-dask-compute-task – KRKirov Dec 24 '19 at 11:47

1 Answers1

1

The first part of the question is answered by the existence of dask.distributed.worker_client which does exactly what we need: Provide a client talking to the scheduler of the current worker. With that, the task submitter becomes the following:

import time
from dask.distributed import Client, Pub, worker_client

c_submit = Client('tcp://127.0.0.1:8786')

def slow_func():
    with worker_client() as c_report:
        q = Pub('progress', client=c_report)
        for i in range(10):
            q.put(f'{i}/10')
            time.sleep(1)
c_submit.submit(slow_func)

For the second part, one non-terrible approach would be to simply generate an ID every time the task is submitted. That is, do something like this:

import time
import uuid

from dask.distributed import Client, Pub, worker_client

c_submit = Client('tcp://127.0.0.1:8786')

def slow_func(task_id):
    with worker_client() as c_report:
        q = Pub('progress', client=c_report)
        for i in range(10):
            q.put(f'{task_id}: {i}/10')
            time.sleep(1)

c_submit.submit(slow_func, uuid.uuid4())

This works and solves my problem, but it still feels a bit weird to use a new ID when the future already has a perfectly usable one in its key.

fuglede
  • 17,388
  • 2
  • 54
  • 99
  • `distributed.utils.thread_state.key` has the key of the task that you're within now if that helps. – MRocklin Dec 24 '19 at 19:27
  • Thanks, that's exactly what I had in mind (and now that I know what to search for, I also see that that part of the question [has been asked before](https://stackoverflow.com/q/39330017/5085211))! I can edit the answer to also include this information, but if you prefer, I'd also happily accept this as new answer if you were to post one. – fuglede Dec 25 '19 at 11:08