I have a set of data for which I want to some simple groupby/count operation and I don't seem to be able to do it using dask.
Most probably I don't understand the way the groupby/reduce is performed in dask, especially when the index is in the grouping key. So i'll illustrate my problem with toy data.
So first I create a dataframe with 3 columns.
import pandas as pd
import numpy as np
np.random.seed(0)
df = pd.DataFrame(
{"A": np.random.randint(6, size=20),
"B": np.random.randint(6, size=20),
"C": np.random.rand(20)}
)
df = df.set_index("A")
So I have a dataframe with an index and 2 columns. In pandas I would do:
result = df.groupby(["A", "B"]).C.count().reset_index().set_index("A")
In the end I will want to save the result in parquet files.
Now Let's go to dask, I can do basically the same operations:
import dask.dataframe as dd
ddf = dd.from_pandas(df, npartitions=4)
result = ddf.groupby(["A", "B"]).C.count().reset_index().set_index("A")
Calling compute leads to the same result. However, when inspecting graph I find:
Computation Graph for GroupBy/Count
We see that everything is reduced to one partition. I can understand this is more or less necessary when data to be grouped are spread in different partitions or when the number of actual grouping keys is small.
But in my case, the index is in the parition so I would expect a completely parallel graph. Moreover, the number of grouping keys is of the same order of magnitude as the number of initial rows (divided by a factor 2 or 3).
Like the one obtained by this code:
result = ddf.map_partitions(
lambda x: x.groupby(
[x.index, x.B]
).C.count().reset_index().set_index("A")
)
which gives the following graph: Parallel computation graph
So is there a way to obtain this parallel graph with the normal groupby/reduce dask fonctions ?
This is very important when the number of grouping keys is very large.