Here's two ways to create a sortable column ROW_UID
in your Dask Dataframe.
Method 1 creates a string column ROW_UID
which looks like: "{partition_i}-{row_i}"
Method 2 created a int64 column ROW_UID
. The values here are the corresponding row-index across the dataframe, i.e. the row-index if you had called .compute()
.
Both methods both rely on a small line at the end of the Dask docs for map_partitions
:
Your map function gets information about where it is in the dataframe by accepting a special partition_info keyword argument.
def func(partition, partition_info=None):
pass
This will receive the following information:
partition_info
{'number': 1, 'division': 3}
Method 1 (fast): set ROW_UID
to be a unique string
Here, we just append the partition-number to a row-id, where row-id is unique for each partition:
def get_num_zeros_to_pad(max_i: int) -> int:
assert isinstance(max_i, int) and max_i >= 1
num_zeros = math.ceil(math.log10(max_i)) ## Ref: https://stackoverflow.com/a/51837162/4900327
if max_i == 10 ** num_zeros: ## If it is a power of 10
num_zeros += 1
return num_zeros
def pad_zeros(i: int, max_i: int = None) -> str:
assert isinstance(i, int) and i >= 0
if max_i is None:
return str(i)
assert isinstance(max_i, int) and max_i >= i
num_zeros: int = get_num_zeros_to_pad(max_i)
return f'{i:0{num_zeros}}'
def set_unique_row_num(df_part: pd.DataFrame, npartitions: int, partition_info: Dict):
df_part_len: int = len(df_part)
part_id: str = pad_zeros(partition_info['number'], npartitions)
row_uids = [f'{part_id}-{pad_zeros(row_i, df_part_len)}' for row_i in range(0, df_part_len)]
return df_part.assign(ROW_UID=row_uids)
df = df.map_partitions(
set_unique_row_num,
npartitions=df.npartitions,
meta={**df.dtypes.to_dict(), 'ROW_UID': str,}
)
# assert df['ROW_UID'].nunique().compute() == len(df)
Output:
>>> df['ROW_UID'].partitions[3].compute()
189079 003-00000
189080 003-00001
189081 003-00002
189082 003-00003
189083 003-00004
...
252101 003-63022
252102 003-63023
252103 003-63024
252104 003-63025
252105 003-63026
Name: ROW_UID, Length: 63027, dtype: object
Method 2 (slow when data has not been persisted): set ROW_UID
to be a unique number
This method gives us unique numeric row-ids across the dataframe.
E.g. if partition#0 has 5000 rows & partition#1 has 7000 rows, this will add a column ROW_UID
, which takes values 0-4999
for partition#0, 5000-11999
for partition#1, etc.
This method requires computing the length of each partition and passing it to map_partitions
. This can be slow if your dataframe has not yet been persisted...make sure you call .persist()
before running the following code.
Side note: this is equivalent to the pandas command df.reset_index(drop=True).reset_index().rename(columns={'index': 'ROW_UID'})
def set_unique_row_num(df_part: pd.DataFrame, npartitions: int, partition_lens: List[int], partition_info: Dict):
partition_number: int = partition_info['number']
df_part_len: int = len(df_part)
rows_before_this_partition: int = sum(partition_lens[:partition_number])
row_uids = [
row_i
for row_i in range(rows_before_this_partition, rows_before_this_partition+df_part_len)
]
return df_part.assign(ROW_UID=row_uids)
partition_lens: List[int] = [
x[1] for x in
sorted(list(df.map_partitions(
lambda df_part, partition_info: (partition_info['number'], len(df_part)),
meta=tuple,
).compute()), key=lambda x: x[0])
]
df = df.map_partitions(
set_unique_row_num,
npartitions=df.npartitions,
partition_lens=partition_lens,
meta={**df.dtypes.to_dict(), 'ROW_UID': int,} ## NOTE! here we have 'ROW_UID' as an int
)
# assert df['ROW_UID'].nunique().compute() == len(df)
Output:
>>> df['ROW_UID'].partitions[3].compute()
189079 189079
189080 189080
189081 189081
189082 189082
189083 189083
...
252101 252101
252102 252102
252103 252103
252104 252104
252105 252105
Name: ROW_UID, Length: 63027, dtype: int64