As is well documented, Dask creates a strictly increasing index on a per partition basis when reset_index
is called, resulting in duplicate indices over the whole set. What is the best way (e.g. computationally quickest) to create a strictly increasing index in Dask - which doesn't have to be consecutive - over the whole set? I was hoping map_partitions
would pass in the partition number, but I don't think it does. Thanks.
EDIT
Thanks @MRocklin, I've got this far, but I need a little assistance on how to recombine my series with the original dataframe.
def create_increasing_index(ddf:dd.DataFrame):
mps = int(len(ddf) / ddf.npartitions + 1000)
values = ddf.index.values
def do(x, max_partition_size, block_id=None):
length = len(x)
if length == 0:
raise ValueError("Does not work with empty partitions. Consider using dask.repartition.")
start = block_id[0] * max_partition_size
return da.arange(start, start+length, chunks=1)
series = values.map_blocks(do, max_partition_size=mps, dtype=np.int64)
ddf2 = dd.concat([ddf, dd.from_array(series)], axis=1)
return ddf2
Where I'm getting the error "ValueError: Unable to concatenate DataFrame with unknown division specifying axis=1". Is there a better way than using dd.concat? Thanks.
EDIT AGAIN
Actually, for my purposes (and amounts of data that I was testing on - only a few gb) cumsum is fast enough. I'll revisit when this becomes too slow!