I guess I'm missing something (still a Dask Noob) but I'm trying the batching suggestion to avoid too many Dask tasks from here:
https://docs.dask.org/en/latest/delayed-best-practices.html
and can't make them work. This is what I tried:
import dask
def f(x):
return x*x
def batch(seq):
sub_results = []
for x in seq:
sub_results.append(f(x))
return sub_results
batches = []
for i in range(0, 1000000000, 1000000):
result_batch = dask.delayed(batch, range(i, i + 1000000))
batches.append(result_batch)
Batches now contains delayed objects:
batches[:3]
[Delayed(range(0, 1000000)),
Delayed(range(1000000, 2000000)),
Delayed(range(2000000, 3000000))]
but when I compute them I get batch function pointers (I think??):
results = dask.compute(*batches)
results[:3]
(<function __main__.batch(seq)>,
<function __main__.batch(seq)>,
<function __main__.batch(seq)>)
I have two questions:
Is this really how should this be run, because it seems to contradict the first line of the
Best practices
page where it says to not run it likedelayed(f(x))
because that would run immediately and not lazy.How do I get the results of above batched run?