9

I have a Dask DataFrames that contains index which is not unique (client_id). Repartitioning and resetting index ends up with very uneven partitions - some contains only a few rows, some thousands. For instance the following code:

for p in range(ddd.npartitions):
    print(len(ddd.get_partition(p)))

prints out something like that:

55
17
5
41
51
1144
4391
75153
138970
197105
409466
415925
486076
306377
543998
395974
530056
374293
237
12
104
52
28

My DataFrame is one-hot encoded and has over 500 columns. Larger partitions don't fit in memory. I wanted to repartition the DataFrame to have partitions even in size. Do you know an efficient way to do this?

EDIT 1

Simple reproduce:

df = pd.DataFrame({'x':np.arange(0,10000),'y':np.arange(0,10000)})
df2 = pd.DataFrame({'x':np.append(np.arange(0,4995),np.arange(5000,10000,1000)),'y2':np.arange(0,10000,2)})
dd_df = dd.from_pandas(df, npartitions=10).set_index('x')
dd_df2= dd.from_pandas(df2, npartitions=5).set_index('x')
new_ddf=dd_df.merge(dd_df2, how='right')
#new_ddf = new_ddf.reset_index().set_index('x')
#new_ddf = new_ddf.repartition(npartitions=2)
new_ddf.divisions
for p in range(new_ddf.npartitions):
    print(len(new_ddf.get_partition(p)))

Note the last partitions (one single element):

1000
1000
1000
1000
995
1
1
1
1
1

Even when we uncomment the commented lines, partitions remain uneven in the size.

Edit II: Walkoround

Simple wlakoround can be achieved by the following code. Is there a more elgant way to do this (more in a Dask way)?

def repartition(ddf, npartitions=None):
    MAX_PART_SIZE = 100*1024

    if npartitions is None:
        npartitions = ddf.npartitions

    one_row_size = sum([dt.itemsize for dt in ddf.dtypes])
    length = len(ddf)

    requested_part_size = length/npartitions*one_row_size
    if requested_part_size <= MAX_PART_SIZE:
        np = npartitions
    else:
        np = length*one_row_size/MAX_PART_SIZE

    chunksize = int(length/np)


    vc = ddf.index.value_counts().to_frame(name='count').compute().sort_index()

    vsum = 0
    divisions = [ddf.divisions[0]]
    for i,v in vc.iterrows():
        vsum+=v['count']
        if vsum > chunksize:
            divisions.append(i)
            vsum = 0
    divisions.append(ddf.divisions[-1])


    return ddf.repartition(divisions=divisions, force=True)
Szymon
  • 139
  • 1
  • 5
  • Did you try [repartition](http://docs.dask.org/en/latest/dataframe-api.html#dask.dataframe.DataFrame.repartition)? – Primer Oct 04 '18 at 09:58
  • @Primer, yes I did, but it has no effect. The problem seems to be connected with the fact that the dataframe is read from parquet. – Szymon Oct 04 '18 at 14:29
  • @Szymon why do you think it's related to the fact that it's read from parquet? – bartaelterman Feb 06 '19 at 21:02

1 Answers1

10

You're correct that .repartition won't do the trick since it doesn't handle any of the logic for computing divisions and just tries to combine the existing partitions wherever possible. Here's a solution I came up with for the same problem:

def _rebalance_ddf(ddf):
    """Repartition dask dataframe to ensure that partitions are roughly equal size.

    Assumes `ddf.index` is already sorted.
    """
    if not ddf.known_divisions:  # e.g. for read_parquet(..., infer_divisions=False)
        ddf = ddf.reset_index().set_index(ddf.index.name, sorted=True)
    index_counts = ddf.map_partitions(lambda _df: _df.index.value_counts().sort_index()).compute()
    index = np.repeat(index_counts.index, index_counts.values)
    divisions, _ = dd.io.io.sorted_division_locations(index, npartitions=ddf.npartitions)
    return ddf.repartition(divisions=divisions)

The internal function sorted_division_locations does what you want already, but it only works on an actual list-like, not a lazy dask.dataframe.Index. This avoids pulling the full index in case there are many duplicates and instead just gets the counts and reconstructs locally from that.

If your dataframe is so large that even the index won't fit in memory then you'd need to do something even more clever.

bnaul
  • 17,288
  • 4
  • 32
  • 30