2

I need to add an id for the rows in a dask dataframe, first thing I tried was to add an accumulative index as shown in this other question

df["idx"] = 1
df["idx"] = df["idx"].cumsum()

But my laptop crashed so maybe a random unique id is an option for this

As additional information, the file I'm using its 10GB in parquet format and 20Gb in CSV and my laptop has 16Gb of RAM

The other option I don't know if possible, is to just append/add the new column to the file without loading it into memory

sshashank124
  • 31,495
  • 9
  • 67
  • 76
Luis Ramon Ramirez Rodriguez
  • 9,591
  • 27
  • 102
  • 181

2 Answers2

0

I would figure out some code that does this for Pandas, and then use the map_partitions method to apply the same function in parallel. Maybe something like the following?

def add_unique_id_column(df: pandas.DataFrame) -> pandas.DataFrame:
    ...

df = df.map_partitions(add_unique_id_column)
MRocklin
  • 55,641
  • 23
  • 163
  • 235
0

Here's two ways to create a sortable column ROW_UID in your Dask Dataframe.

Method 1 creates a string column ROW_UID which looks like: "{partition_i}-{row_i}"

Method 2 created a int64 column ROW_UID. The values here are the corresponding row-index across the dataframe, i.e. the row-index if you had called .compute().

Both methods both rely on a small line at the end of the Dask docs for map_partitions:

Your map function gets information about where it is in the dataframe by accepting a special partition_info keyword argument.

def func(partition, partition_info=None):
   pass

This will receive the following information:

partition_info  
{'number': 1, 'division': 3}

Method 1 (fast): set ROW_UID to be a unique string

Here, we just append the partition-number to a row-id, where row-id is unique for each partition:

def get_num_zeros_to_pad(max_i: int) -> int:
    assert isinstance(max_i, int) and max_i >= 1
    num_zeros = math.ceil(math.log10(max_i))  ## Ref: https://stackoverflow.com/a/51837162/4900327
    if max_i == 10 ** num_zeros:  ## If it is a power of 10
        num_zeros += 1
    return num_zeros

def pad_zeros(i: int, max_i: int = None) -> str:
    assert isinstance(i, int) and i >= 0
    if max_i is None:
        return str(i)
    assert isinstance(max_i, int) and max_i >= i
    num_zeros: int = get_num_zeros_to_pad(max_i)
    return f'{i:0{num_zeros}}'

def set_unique_row_num(df_part: pd.DataFrame, npartitions: int, partition_info: Dict):
    df_part_len: int = len(df_part)
    part_id: str = pad_zeros(partition_info['number'], npartitions)
    row_uids = [f'{part_id}-{pad_zeros(row_i, df_part_len)}' for row_i in range(0, df_part_len)]
    return df_part.assign(ROW_UID=row_uids)

df = df.map_partitions(
    set_unique_row_num, 
    npartitions=df.npartitions, 
    meta={**df.dtypes.to_dict(), 'ROW_UID': str,}
)

# assert df['ROW_UID'].nunique().compute() == len(df)

Output:

>>> df['ROW_UID'].partitions[3].compute()

189079    003-00000
189080    003-00001
189081    003-00002
189082    003-00003
189083    003-00004
            ...    
252101    003-63022
252102    003-63023
252103    003-63024
252104    003-63025
252105    003-63026
Name: ROW_UID, Length: 63027, dtype: object

Method 2 (slow when data has not been persisted): set ROW_UID to be a unique number

This method gives us unique numeric row-ids across the dataframe.

E.g. if partition#0 has 5000 rows & partition#1 has 7000 rows, this will add a column ROW_UID, which takes values 0-4999 for partition#0, 5000-11999 for partition#1, etc.

This method requires computing the length of each partition and passing it to map_partitions. This can be slow if your dataframe has not yet been persisted...make sure you call .persist() before running the following code.

Side note: this is equivalent to the pandas command df.reset_index(drop=True).reset_index().rename(columns={'index': 'ROW_UID'})

def set_unique_row_num(df_part: pd.DataFrame, npartitions: int, partition_lens: List[int], partition_info: Dict):
    partition_number: int = partition_info['number']
    df_part_len: int = len(df_part)
    rows_before_this_partition: int = sum(partition_lens[:partition_number])
    row_uids = [
        row_i
        for row_i in range(rows_before_this_partition, rows_before_this_partition+df_part_len)
    ]
    return df_part.assign(ROW_UID=row_uids)

partition_lens: List[int] = [
    x[1] for x in
    sorted(list(df.map_partitions(
        lambda df_part, partition_info: (partition_info['number'], len(df_part)),
        meta=tuple,
    ).compute()), key=lambda x: x[0])
]

df = df.map_partitions(
    set_unique_row_num, 
    npartitions=df.npartitions, 
    partition_lens=partition_lens,
    meta={**df.dtypes.to_dict(), 'ROW_UID': int,}  ## NOTE! here we have 'ROW_UID' as an int
)

# assert df['ROW_UID'].nunique().compute() == len(df)

Output:

>>> df['ROW_UID'].partitions[3].compute()

189079    189079
189080    189080
189081    189081
189082    189082
189083    189083
           ...  
252101    252101
252102    252102
252103    252103
252104    252104
252105    252105
Name: ROW_UID, Length: 63027, dtype: int64
Abhishek Divekar
  • 1,131
  • 2
  • 15
  • 31