1

I have the following scenario that I need to solve with Dask scheduler and workers:

  • Dask program has N functions called in a loop (N defined by the user)

  • Each function is started with delayed(func)(args) to run in parallel.

  • When each function from the previous point starts, it triggers W workers. This is how I invoke the workers:

    futures = client.map(worker_func, worker_args)     
    worker_responses = client.gather(futures)
    

That means that I need N * W workers to run everything in parallel. The problem is that this is not optimal as it's too much resource allocation, I run it on the cloud and it's expensive. Also, N is defined by the user, so I don't know beforehand how much processing capability I need to have.

Is there a way to queue up the workers in such a way that if I define that Dask has X workers, when a worker ends then the next one starts?

ps0604
  • 1,227
  • 23
  • 133
  • 330
  • I think you have a misconception about how the client behaves, which is the source of your problem! specifically, when you refer to workers and running in parallel, you probably mean the same thing as the client behaves like _both_ a [concurrent.futures executor](https://docs.python.org/3/library/concurrent.futures.html) and _also_ handles the resulting Futures from your `.map()` automatically – ti7 Aug 01 '21 at 13:23
  • Let's say I have 100 workers available, one scheduler, and the application that invokes the scheduler. This application calls 5 functions, each with `delayed`. Each of these functions invokes 50 workers (in total, the application needs to run 250 workers). Since 250 greater than 100, I need to know how Dask distributed will handle the request to start worker number 101. Will Dask wait until one of the busy 100 workers ends to call the next worker? – ps0604 Aug 01 '21 at 13:32
  • I would avoid spawning additional workers and just let the scheduler schedule on the available ones - yes, Dask will schedule the work amongst the available workers when there are more tasks than workers – ti7 Aug 01 '21 at 13:44
  • 2
    I find that you are confusing "worker" (a process running on a machine which can run tasks) and "task" (a function call with arguments). A scheduler distributed tasks, and does *not* start workers. A cluster manager may start workers, but I don't think you are using that. Submitting 250 tasks to 100 workers is is totally fine, the scheduler will run new tasks as workers become free. – mdurant Aug 01 '21 at 18:21
  • @mdurant I get it now, thanks for the explanation – ps0604 Aug 02 '21 at 00:15
  • One thing that I noticed is that if I have 3 workers and start with `client.map` 6 tasks, all the tasks seems to run in parallel, instead of having 3 tasks run first and then when they finish run the other 3. Am I doing something wrong? Each of my workers run in a Docker container. – ps0604 Aug 02 '21 at 12:53
  • @ps0604 a few things could be happening, but assuming none of the tasks depend on each other, each worker may just have more than just 1 thread (and so it can do more work at once) or you're just observing them being scheduled to the workers, but they don't really run at the same time or are faster than the observation window (2s by default https://docs.dask.org/en/latest/configuration-reference.html#client ) – ti7 Aug 02 '21 at 14:20
  • Thanks, printing Client I get `` This means that I have 24 threads and 3 workers, does that mean that each worker has up to 8 threads? – ps0604 Aug 04 '21 at 12:26
  • yes, if you didn't choose explicitly, each of your 3 workers has one process controlling 8 threads (one for each logical core by default) https://stackoverflow.com/questions/49406987/how-do-we-choose-nthreads-and-nprocs-per-worker-in-dask-distributed – ti7 Aug 05 '21 at 08:11

1 Answers1

2

First define the number of workers you need, treat them as ephemeral, but static for the entire duration of your processing
You can create them dynamically (when you start or later on), but probably want to have them all ready right at the beginning of your processing

From your view, the client is an executor (so when you refer to workers and running in parallel, you probably mean the same thing

This class resembles executors in concurrent.futures but also allows Future objects within submit/map calls. When a Client is instantiated it takes over all dask.compute and dask.persist calls by default.

Once your workers are available, Dask will distribute work given to them via the scheduler

You should make any tasks that depend on each other do so by passing the result to dask.delayed() with the preceeding function result (which is a Future, and not yet the result)
This Futures-as-arguments will allow Dask to build a task graph of your work

Example use https://examples.dask.org/delayed.html
Future reference https://docs.dask.org/en/latest/futures.html#distributed.Future

Dependent Futures with dask.delayed

Here's a complete example from the Delayed docs (actually combines several successive examples to the same result)

import dask
from dask.distributed import Client

client = Client(...)  # connect to distributed cluster

def inc(x):
    return x + 1

def double(x):
    return x * 2

def add(x, y):
    return x + y

data = [1, 2, 3, 4, 5]

output = []
for x in data:
    a = dask.delayed(inc)(x)
    b = dask.delayed(double)(x)
    c = dask.delayed(add)(a, b)    # depends on a and b
    output.append(c)

total = dask.delayed(sum)(output)  # depends on everything
total.compute()  # 45

You can call total.visualize() to see the task graph

Task Graph from Dask Docs
(image from Dask Delayed docs)

Collections of Futures

If you're already using .map(..) to map function and argument pairs, you can keep creating Futures and then .gather(..) them all at once, even if they're in a collection (which is convenient to you here)

The .gather()'ed results will be in the same arrangement as they were given (a list of lists)

[[fn1(args11), fn1(args12)], [fn2(args21)], [fn3(args31), fn3(args32), fn3(args33)]]

https://distributed.dask.org/en/latest/api.html#distributed.Client.gather

import dask
from dask.distributed import Client

client = Client(...)  # connect to distributed cluster

collection_of_futures = []

for worker_func, worker_args in iterable_of_pairs_of_fn_args:
    futures = client.map(worker_func, worker_args)
    collection_of_futures.append(futures)

results = client.gather(collection_of_futures)

notes

  • worker_args must be some iterable to map to worker_func, which can be a source of error
  • .gather()ing will block until all the futures are completed or raise

.as_completed()

If you need the results as quickly as possible, you could use .as_completed(..), but note the results will be in a non-deterministic order, so I don't think this makes sense for your case .. if you find it does, you'll need some extra guarantees

  • include information about what to do with the result in the result
  • keep a reference to each and check them
  • only combine groups where it doesn't matter (ie. all the Futures have the same purpose)

also note that the yielded futures are complete, but are still a Future, so you still need to call .result() or .gather() them

https://distributed.dask.org/en/latest/api.html#distributed.as_completed

ti7
  • 16,375
  • 6
  • 40
  • 68
  • I invoke the workers with dask distributed: `futures = client.map(worker_func, worker_args) worker_responses = client.gather(futures)` how can I incorporate delay into that? – ps0604 Aug 01 '21 at 11:59
  • hmm.. if you arrange your input like that, it should already be building a collection of Futures and then gathering the result - if you don't immediately want the result (ie you have many functions and worker_args pairs), collect the result of each `.map(..)` into a list and then `.gather()` them all at once - let me update my answer – ti7 Aug 01 '21 at 12:14