0

I guess I'm missing something (still a Dask Noob) but I'm trying the batching suggestion to avoid too many Dask tasks from here:

https://docs.dask.org/en/latest/delayed-best-practices.html

and can't make them work. This is what I tried:

import dask

def f(x):
    return x*x

def batch(seq):
    sub_results = []
    for x in seq:
        sub_results.append(f(x))
    return sub_results

batches = []
for i in range(0, 1000000000, 1000000):
 result_batch = dask.delayed(batch, range(i, i + 1000000))
 batches.append(result_batch)

Batches now contains delayed objects:

batches[:3]

[Delayed(range(0, 1000000)),
 Delayed(range(1000000, 2000000)),
 Delayed(range(2000000, 3000000))]

but when I compute them I get batch function pointers (I think??):

results = dask.compute(*batches)
results[:3]

(<function __main__.batch(seq)>,
 <function __main__.batch(seq)>,
 <function __main__.batch(seq)>)

I have two questions:

  1. Is this really how should this be run, because it seems to contradict the first line of the Best practices page where it says to not run it like delayed(f(x)) because that would run immediately and not lazy.

  2. How do I get the results of above batched run?

Community
  • 1
  • 1
K.-Michael Aye
  • 5,465
  • 6
  • 44
  • 56

1 Answers1

1

It looks like your code is missing a pair of parentheses. Not sure if this was a typo (???).

Per the example from the docs, I think you wanted

result_batch = dask.delayed(batch)(range(i, i + 1000000))

where I replaced batch, ran... by batch)(ran..., since the call to the batch() function should be delayed.

Answers

  1. With the typo fixed, your code works fine for me - the computation will now be delayed. Regarding what's written at the start of the docs - it matters what is being wrapped with dask.delayed. With dask.delayed( batch(range(i, i + 1000000)) ) the call to the function batch(...) is not being delayed and so it would run immediately. This is because the output of the function has been wrapped within dask.delayed, and so the output (result) would be delayed, which is not the desired workflow. However, dask.delayed(batch)(range(i, i + 1000000)) delays the call to the function (since, here, dask.delayed wraps the function itself). I believe this is what the docs are trying to say, at the start of the best practices section.
  2. Again, with the typo fixed, your code runs as expected for me and prints the lengthy output to the screen.
edesz
  • 11,756
  • 22
  • 75
  • 123
  • 1
    Yeah, makes sense. I just copied this from their docs without understanding it at the time, as it was wrong there, too: https://github.com/dask/dask/commit/5ad732dcec47bae2821898eb64787538df37d6e2#diff-ae61d6a5eef0f1f154c1ef766680b0f5411285d3aee63e5d959069d7056f004e – K.-Michael Aye Dec 12 '20 at 21:27
  • Ah, I didn't think to look as deep as an issue. That explains it. – edesz Dec 13 '20 at 00:28