0

I need to compute many NumPy arrays (that can be up to 4-dimensional), one for each partition of a Dask dataframe, and then add them as arrays. However, I'm struggling to make map_partitions return an array for each partition instead of a single array for all of them.

import dask.dataframe as dd
import numpy as np, pandas as pd

df = pd.DataFrame(range(15), columns=['x'])
ddf = dd.from_pandas(df, npartitions=3)

def func(partition):
    # Here I also tried returning the array in a list and in a tuple
    return np.array([[1, 2], [3, 4]])

# Here I tried all the options available for 'meta'
results = ddf.map_partitions(func).compute()

Then results is:

array([[1, 2],
       [3, 4],
       [1, 2],
       [3, 4],
       [1, 2],
       [3, 4]])

And if, instead, I do results.sum().compute() I get 30.

What I'd like to get is:

[np.array([[1, 2],[3, 4]]), np.array([[1, 2],[3, 4]]), np.array([[1, 2],[3, 4]])]

So that if I compute the sum, I get:

array([[ 3,  6],
       [ 9, 12]])

How can you achieve this result with Dask?

6659081
  • 381
  • 7
  • 21

2 Answers2

0

I managed to make it work like this, but I don't know if this is the best way:

from dask import delayed
results = []
for partition in ddf.partitions:
    result = delayed(func)(partition)
    results.append(result)

delayed(sum)(results).compute()

The result of the computation is:

array([[ 3,  6],
       [ 9, 12]])
6659081
  • 381
  • 7
  • 21
0

You are right, a dask-array is usually to be viewed as a single logical array, which just happens to be made of pieces. Single you are not using the logical layer, you could have done your work with delayed alone. On the other hand, it seems like the end result you want really is a sum over all the data, so maybe even simpler would be an appropriate reshape and sum(axis=)?

ddf.map_partitions(func).compute_chunk_sizes().reshape(
    -1, 2, 2).sum(axis=0).compute()

(compute_chunk_sizes is needed because although your original pandas dataframe had a known size, Dask did not evaluate your function yet to know what sizes it gave back)

However, given your setup, the following would work and be more similar to your original attempt, see .to_delayed()

list_of_delayed = ddf.map_partitions(func).to_delayed().tolist()
tuple_of_np_lists = dask.compute(*list_of_delayed)

(tolist forces evaluating the contained delayed objects)

mdurant
  • 27,272
  • 5
  • 45
  • 74
  • Hi, I tried your solutions but both only seem to work for 2-dimensional arrays. In my case, the arrays can be up to 4-dimensional. With both I get a `KeyError`. – 6659081 Mar 04 '21 at 15:53