12

I have some fairly large csv files (~10gb) and would like to take advantage of dask for analysis. However, depending on the number of partitions I set the dask object to read in with, my groupby results change. My understanding was that dask took advantage of the partitions for out-of-core processing benefits, but that it would still return appropriate groupby output. This doesn't seem to be the case and I'm struggling to work out what alternate settings are necessary. Below is a small example:

df = pd.DataFrame({'A': np.arange(100), 'B': np.random.randn(100), 'C': np.random.randn(100), 'Grp1': np.repeat([1, 2], 50), 'Grp2': [3, 4, 5, 6], 25)})

test_dd1 = dd.from_pandas(df, npartitions=1)
test_dd2 = dd.from_pandas(df, npartitions=2)
test_dd5 = dd.from_pandas(df, npartitions=5)
test_dd10 = dd.from_pandas(df, npartitions=10)
test_dd100 = dd.from_pandas(df, npartitions=100)

def test_func(x):
    x['New_Col'] = len(x[x['B'] > 0.]) / len(x['B'])
    return x

test_dd1.groupby(['Grp1', 'Grp2']).apply(test_func).compute().head()
   A               B               C Grp1 Grp2 New_Col
0  0 -0.561376 -1.422286     1     3     0.48
1  1 -1.107799  1.075471     1     3     0.48
2  2 -0.719420 -0.574381     1     3     0.48
3  3 -1.287547 -0.749218     1     3     0.48
4  4  0.677617 -0.908667     1     3     0.48

test_dd2.groupby(['Grp1', 'Grp2']).apply(test_func).compute().head()
   A               B              C  Grp1 Grp2 New_Col
0  0 -0.561376 -1.422286     1     3     0.48
1  1 -1.107799  1.075471     1     3     0.48
2  2 -0.719420 -0.574381     1     3     0.48
3  3 -1.287547 -0.749218     1     3     0.48
4  4  0.677617 -0.908667     1     3     0.48

test_dd5.groupby(['Grp1', 'Grp2']).apply(test_func).compute().head()
   A               B              C  Grp1 Grp2 New_Col
0  0 -0.561376 -1.422286     1     3     0.45
1  1 -1.107799  1.075471     1     3     0.45
2  2 -0.719420 -0.574381     1     3     0.45
3  3 -1.287547 -0.749218     1     3     0.45
4  4  0.677617 -0.908667     1     3     0.45

test_dd10.groupby(['Grp1', 'Grp2']).apply(test_func).compute().head()
   A               B              C  Grp1 Grp2 New_Col
0  0 -0.561376 -1.422286     1     3      0.5
1  1 -1.107799  1.075471     1     3      0.5
2  2 -0.719420 -0.574381     1     3      0.5
3  3 -1.287547 -0.749218     1     3      0.5
4  4  0.677617 -0.908667     1     3      0.5

test_dd100.groupby(['Grp1', 'Grp2']).apply(test_func).compute().head()
   A               B              C  Grp1 Grp2  New_Col
0  0 -0.561376 -1.422286     1     3        0
1  1 -1.107799  1.075471     1     3        0
2  2 -0.719420 -0.574381     1     3        0
3  3 -1.287547 -0.749218     1     3        0
4  4  0.677617 -0.908667     1     3        1

df.groupby(['Grp1', 'Grp2']).apply(test_func).head()
   A               B               C Grp1 Grp2 New_Col
0  0 -0.561376 -1.422286     1     3     0.48
1  1 -1.107799  1.075471     1     3     0.48
2  2 -0.719420 -0.574381     1     3     0.48
3  3 -1.287547 -0.749218     1     3     0.48
4  4  0.677617 -0.908667     1     3     0.48

Does the groupby step only operate within each partition rather than looking over the full dataframe? In this case it's trivial to set npartitions=1 and it doesn't seem to impact performance all that much but since read_csv automatically sets a certain number of partitions how do you setup the call to ensure that groupby results are accurate?

Thanks!

Bhage
  • 121
  • 1
  • 5
  • My first thought is that dask's groupby/apply may not guarantee the order of results, but they might all be there anyways. – shoyer Feb 06 '16 at 02:14
  • Yeah, I was thinking that as well but I've done various unique slices and the results within group end up differing as the partition count increases. In a single set of unique 'grp1/grp2' there would be 2 different values for example. – Bhage Feb 08 '16 at 03:55
  • Any resolution to this issue? – codingknob May 12 '16 at 15:30
  • This is a show-stopper for me. I will continue using a home-grown solution for group-by until there is an explanation or workaround for this issue. I was not able to find the raised issue as the only answer suggested to the OP – parity3 Jan 26 '17 at 00:03
  • The resolved issue is here: https://github.com/dask/dask/issues/967 – Eric Ness Nov 08 '18 at 18:09
  • I am also seeing this issue – ionox0 Jul 11 '22 at 16:11

1 Answers1

5

I am surprised by this result. Groupby.apply should return the same results regardless of the number of partitions. If you can supply a reproducible example I encourage you to raise an issue and one of the developers will take a look.

MRocklin
  • 55,641
  • 23
  • 163
  • 235
  • I've tested the OP and it's not returning the same result for all partitions. Maybe this unexpected behavior is happening again due to https://stackoverflow.com/a/60725384? – fjsj Apr 10 '20 at 14:47