I'm running a number of slow tasks on a Dask scheduler, and I want progress reports from each task. The tasks will be submitted from the same machine that will handle progress reports, so that could be kept in the same process, but for now let us just assume that tasks are submitted and progress reports handled in separate processes.
Dask provides Coordination Primitives whose intended use cases include being able to monitor progress:
These can be used to control access to external resources, track progress of ongoing computations, or share data in side-channels between many workers, clients, and tasks sensibly.
The simplest example I've been able to conjure that makes us of this is the following:
Task submitter:
from dask.distributed import Client, Pub
import time
c = Client('tcp://127.0.0.1:8786')
def slow_func():
q = Pub('progress')
for i in range(10):
q.put(f'{i}/10')
time.sleep(1)
c.submit(slow_func)
Task reporter:
from dask.distributed import Client, Sub
c = Client('tcp://127.0.0.1:8786')
q = Sub('progress')
while True:
print(q.get())
This works with Pub
/Sub
but would also work equally well with a Queue
. Now, even though it does work, it seems like it's not what the authors had in mind:
- I end up implicitly relying on the same
Client
I've used to submit the task when doing the reporting; i.e. theClient
ends up on the worker nodes. This feels strange. - I have no way of making distinctions between different tasks; ideally I'd be able to use something like the key of the future as part of the report.
So my, admittedly somewhat vague, question is: As far as creating a "Hello world" style example of a Dask future providing progress reports, how would I modify the above to something that could be considered idiomatic Dask, and are there any pitfalls to be aware of?
I can partially get around my first issue by creating a new client for each task (example below), but since I end up with something that appears to work just the same, perhaps doing so is unnecessary.
import time
from dask.distributed import Client, Pub
c_submit = Client('tcp://127.0.0.1:8786')
def slow_func():
c_report = Client('tcp://127.0.0.1:8786')
q = Pub('progress', client=c_report)
for i in range(10):
q.put(f'{i}/10')
time.sleep(1)
c_submit.submit(slow_func)