I have a Dask DataFrames that contains index which is not unique (client_id
). Repartitioning and resetting index ends up with very uneven partitions - some contains only a few rows, some thousands. For instance the following code:
for p in range(ddd.npartitions):
print(len(ddd.get_partition(p)))
prints out something like that:
55 17 5 41 51 1144 4391 75153 138970 197105 409466 415925 486076 306377 543998 395974 530056 374293 237 12 104 52 28
My DataFrame is one-hot encoded and has over 500 columns. Larger partitions don't fit in memory. I wanted to repartition the DataFrame to have partitions even in size. Do you know an efficient way to do this?
EDIT 1
Simple reproduce:
df = pd.DataFrame({'x':np.arange(0,10000),'y':np.arange(0,10000)})
df2 = pd.DataFrame({'x':np.append(np.arange(0,4995),np.arange(5000,10000,1000)),'y2':np.arange(0,10000,2)})
dd_df = dd.from_pandas(df, npartitions=10).set_index('x')
dd_df2= dd.from_pandas(df2, npartitions=5).set_index('x')
new_ddf=dd_df.merge(dd_df2, how='right')
#new_ddf = new_ddf.reset_index().set_index('x')
#new_ddf = new_ddf.repartition(npartitions=2)
new_ddf.divisions
for p in range(new_ddf.npartitions):
print(len(new_ddf.get_partition(p)))
Note the last partitions (one single element):
1000 1000 1000 1000 995 1 1 1 1 1
Even when we uncomment the commented lines, partitions remain uneven in the size.
Edit II: Walkoround
Simple wlakoround can be achieved by the following code. Is there a more elgant way to do this (more in a Dask way)?
def repartition(ddf, npartitions=None):
MAX_PART_SIZE = 100*1024
if npartitions is None:
npartitions = ddf.npartitions
one_row_size = sum([dt.itemsize for dt in ddf.dtypes])
length = len(ddf)
requested_part_size = length/npartitions*one_row_size
if requested_part_size <= MAX_PART_SIZE:
np = npartitions
else:
np = length*one_row_size/MAX_PART_SIZE
chunksize = int(length/np)
vc = ddf.index.value_counts().to_frame(name='count').compute().sort_index()
vsum = 0
divisions = [ddf.divisions[0]]
for i,v in vc.iterrows():
vsum+=v['count']
if vsum > chunksize:
divisions.append(i)
vsum = 0
divisions.append(ddf.divisions[-1])
return ddf.repartition(divisions=divisions, force=True)