6

Empirically it seems that whenever you set_index on a Dask dataframe, Dask will always put rows with equal indexes into a single partition, even if it results in wildly imbalanced partitions.

Here is a demonstration:

import pandas as pd
import dask.dataframe as dd

users = [1]*1000 + [2]*1000 + [3]*1000

df = pd.DataFrame({'user': users})
ddf = dd.from_pandas(df, npartitions=1000)

ddf = ddf.set_index('user')

counts = ddf.map_partitions(lambda x: len(x)).compute()
counts.loc[counts > 0]
# 500    1000
# 999    2000
# dtype: int64

However, I found no guarantee of this behaviour anywhere.

I have tried to sift through the code myself but gave up. I believe one of these inter-related functions probably holds the answer:

When you set_index, is it the case that a single index can never be in two different partitions? If not, then under what conditions does this property hold?


Bounty: I will award the bounty to an answer that draws from a reputable source. For example, referring to the implementation to show that this property has to hold.

Dahn
  • 1,397
  • 1
  • 10
  • 29

3 Answers3

2

is it the case that a single index can never be in two different partitions?

No, it's certainly allowed. Dask will even intend for this to happen. However, because of a bug in set_index, all the data will still end up in one partition.

An extreme example (every row is the same value except one):

In [1]: import dask.dataframe as dd
In [2]: import pandas as pd
In [3]: df = pd.DataFrame({"A": [0] + [1] * 20})
In [4]: ddf = dd.from_pandas(df, npartitions=10)
In [5]: s = ddf.set_index("A")
In [6]: s.divisions
Out[6]: (0, 0, 0, 0, 0, 0, 0, 1)

As you can see, Dask intends for the 0s to be split up between multiple partitions. Yet when the shuffle actually happens, all the 0s still end up in one partition:

In [7]: import dask
In [8]: dask.compute(s.to_delayed())  # easy way to see the partitions separately
Out[8]: 
([Empty DataFrame
  Columns: []
  Index: [],
  Empty DataFrame
  Columns: []
  Index: [],
  Empty DataFrame
  Columns: []
  Index: [],
  Empty DataFrame
  Columns: []
  Index: [],
  Empty DataFrame
  Columns: []
  Index: [],
  Empty DataFrame
  Columns: []
  Index: [],
  Empty DataFrame
  Columns: []
  Index: [0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]],)

This is because the code deciding which output partition a row belongs doesn't consider duplicates in divisions. Treating divisions as a Series, it uses searchsorted with side="right", hence why all the data always ends up in the last partition.

I'll update this answer when the issue is fixed.

0

Is it the case that a single index can never be in two different partitions?

IIUC, the answer for practical purposes is yes.

A dask dataframe will in general have multiple partitions and dask may or may not know about the index values associated with each partition (see Partitions). If dask does know which partition contains which index range, then this will be reflected in df.divisions output (if not, the result of this call will be None).

When running .set_index, dask will compute divisions and it seems that in determining the divisions it will require that divisions are sequential and unique (except for the last element). The relevant code is here.

So two potential follow-up questions: why not allow any non-sequential indexing, and as a specific case of the previous, why not allow duplicate indexes in partitions.

With regards to the first question: for smallish data it might be feasible to think about a design that allows non-sorted indexing, but you can imagine that a general non-sorted indexing won't scale well, since dask will need to store indexes for each partition somehow.

With regards to the second question: it seems that this should be possible, but it also seems that right now it's not implemented correctly. See the snippet below:

# use this to generate 10 indexed partitions
import pandas as pd

for user in range(10):
    
    df = pd.DataFrame({'user_col': [user//3]*100})
    df['user'] = df['user_col']
    df = df.set_index('user')
    df.index.name = 'user_index'
    
    df.to_parquet(f'test_{user}.parquet', index=True)


# now load them into a dask dataframe
import dask.dataframe as dd

ddf = dd.read_parquet('test_*.parquet')

# dask will know about the divisions
print(ddf.known_divisions) # True

# further evidence
print(ddf.divisions) # (0, 0, 0, 1, 1, 1, 2, 2, 2, 3, 3)

# this should show three partitions, but will show only one
print(ddf.loc[0].npartitions) # 1
SultanOrazbayev
  • 14,900
  • 3
  • 16
  • 46
  • Thanks for the answer. I think my question could've been clearer about the fact that I'm specifically interested in the behaviour of Dask's `set_index`. This answer is quite relevant and interesting, but doesn't address that. – Dahn Oct 18 '21 at 23:53
  • Btw, for me, in your example, the divisions are actually unknown (`known_divisions` is `False)`. I'm using Dask `2021.08.01`, Pandas `1.3.1` and PyArrow `5.0.0` – Dahn Oct 18 '21 at 23:56
  • No worries, I'm not confident in dask internals. – SultanOrazbayev Oct 19 '21 at 10:50
0

I have just noticed that Dask's documentation for shuffle says

After this operation, rows with the same value of on will be in the same partition.

This seems to confirm my empirical observation.

Dahn
  • 1,397
  • 1
  • 10
  • 29