2

I want to create features(additional columns) from a dataframe and I have the following structure for many functions.

Following this documentation https://docs.dask.org/en/stable/delayed-best-practices.html I have come up with the code below.

However I get the error message: concurrent.futures._base.CancelledError and many times I get the warning: distributed.utils_perf - WARNING - full garbage collections took 10% CPU time recently (threshold: 10%)

I understand that the object I am appending to delay is very large(it works ok when I use the commented out df) which is why the program crashes but is there a better way of doing it?

import pandas as pd
from dask.distributed import Client, LocalCluster
import dask.dataframe as dd 
import numpy as np
import dask




def main():
    #df = pd.DataFrame({"col1": np.random.randint(1, 100, 100000), "col2": np.random.randint(101, 200, 100000), "col3": np.random.uniform(0, 4, 100000)})
    df = pd.DataFrame({"col1": np.random.randint(1, 100, 100000000), "col2": np.random.randint(101, 200, 100000000), "col3": np.random.uniform(0, 4, 100000000)})

    ddf = dd.from_pandas(df, npartitions=100)

    ddf = ddf.set_index("col1")
    delay = []
    
    
    def create_col_sth():
        
        group = ddf.groupby("col1")["col3"]
        
        @dask.delayed
        def small_fun(lag):
            return f"col_{lag}", group.transform(lambda x: x.shift(lag), meta=('x', 'float64')).apply(lambda x: np.log(x), meta=('x', 'float64'))


        for lag in range(5):
            x = small_fun(lag)
            delay.append(x)
        
    create_col_sth()    
    delayed = dask.compute(*delay)
    
    
    
    for data in delayed:
        ddf[data[0]] = data[1]
        
    ddf.to_parquet("test", engine="fastparquet")


if __name__ == "__main__":
    cluster = LocalCluster(n_workers=6, 
                    threads_per_worker=2,
                    memory_limit='8GB')
    client = Client(cluster)
    main()

J.Ewa
  • 205
  • 3
  • 14

2 Answers2

0

Not sure if this will resolve all of your issues, but generally you don't need to (and shouldn't) mix delayed and dask.datafame operations like this. Additionally, you shouldn't pass large data objects into delayed functions through closures like group in your example. Instead, include them as explicit arguments, or in this case, don't use delayed at all and use dask.dataframe native operations or in-memory operations with dask.dataframe.map_partitions.

Implementing these, I would rewrite your main function as follows:

df = pd.DataFrame({
    "col1": np.random.randint(1, 100, 100000000),
    "col2": np.random.randint(101, 200, 100000000),
    "col3": np.random.uniform(0, 4, 100000000),
})

ddf = dd.from_pandas(df, npartitions=100)
ddf = ddf.set_index("col1")

group = ddf.groupby("col1")["col3"]

# directly assign the dataframe operations as columns
for lag in range(5):
    ddf[f"col_{lag}"] = (
        group
        .transform(lambda x: x.shift(lag), meta=('x', 'float64'))
        .apply(lambda x: np.log(x), meta=('x', 'float64'))
    )

# this triggers the operation implicitly - no need to call compute
ddf.to_parquet("test", engine="fastparquet")
Michael Delgado
  • 13,789
  • 3
  • 29
  • 54
0

After long periods of frustration with Dask, I think I hacked the holy grail of refactoring your pandas transformations wrapped with dask.

Learning points:

  1. Index intelligently. If you are grouping by or merging you should consider indexing the columns you use for those.

  2. Partition and repartition intelligently. If you have a dataframe of 10k rows and another of 1m rows, they should naturally have different partitions.

  3. Don't use dask data frame transformation methods except for example merge. The others should be in pandas code wrapped around map_partitions.

  4. Don't accumulate too large graphs so consider saving after for example indexing or after making a complex transformation.  

  5. If possible filter the data frame and work with smaller subset you can always merge this back to the bigger data set.

  6. If you are working in your local machine set the memory limits within the boundaries of system specifications. This point is very important. In the example below I create one million rows of 3 columns one is an int64 and two are float64 which are 8bytes each and 24bytes in total this gives me 24 million bytes.

import pandas as pd
from dask.distributed import Client, LocalCluster
import dask.dataframe as dd 
import numpy as np
import dask


# https://stackoverflow.com/questions/52642966/repartition-dask-dataframe-to-get-even-partitions
def _rebalance_ddf(ddf):
    """Repartition dask dataframe to ensure that partitions are roughly equal size.

    Assumes `ddf.index` is already sorted.
    """
    if not ddf.known_divisions:  # e.g. for read_parquet(..., infer_divisions=False)
        ddf = ddf.reset_index().set_index(ddf.index.name, sorted=True)
    index_counts = ddf.map_partitions(lambda _df: _df.index.value_counts().sort_index()).compute()
    index = np.repeat(index_counts.index, index_counts.values)
    divisions, _ = dd.io.io.sorted_division_locations(index, npartitions=ddf.npartitions)
    return ddf.repartition(divisions=divisions)


def main(client):
    size = 1000000

    df = pd.DataFrame({"col1": np.random.randint(1, 10000, size), "col2": np.random.randint(101, 20000, size), "col3": np.random.uniform(0, 100, size)})

    # Select appropriate partitions
    ddf = dd.from_pandas(df, npartitions=500)
    del df
    gc.collect()
    # This is correct if you want to group by a certain column it is always best if that column is an indexed one
    ddf = ddf.set_index("col1")

        
    
    ddf = _rebalance_ddf(ddf)
    print(ddf.memory_usage_per_partition(index=True, deep=False).compute())
    print(ddf.memory_usage(deep=True).sum().compute())

    # Always persist your data to prevent big task graphs actually if you omit this step processing will fail
    ddf.to_parquet("test", engine="fastparquet")
    
    ddf = dd.read_parquet("test")

    
    # Dummy code to create a dataframe to be merged based on col1
    ddf2 = ddf[["col2", "col3"]]
    ddf2["col2/col3"] = ddf["col2"] / ddf["col3"] 
    ddf2 = ddf2.drop(columns=["col2", "col3"])
    
    # Repartition the data
    ddf2 = _rebalance_ddf(ddf2)
    print(ddf2.memory_usage_per_partition(index=True, deep=False).compute())
    print(ddf2.memory_usage(deep=True).sum().compute())

    
    
    
    def mapped_fun(data):
        for lag in range(5):
            data[f"col_{lag}"] = data.groupby("col1")["col3"].transform(lambda x: x.shift(lag)).apply(lambda x: np.log(x))
        return data

    # Process the group by transformation in pandas but wrapped with Dask if you use the Dask functions to do this you will 
    # have a variety of issues.
    ddf = ddf.map_partitions(mapped_fun)

    # Additional... you can merge ddf with ddf2 but on an indexed column otherwise you run into a variety of issues
    ddf = ddf.merge(ddf2, on=['col1'], how="left")

    ddf.to_parquet("final", engine="fastparquet")


if __name__ == "__main__":
    cluster = LocalCluster(n_workers=6, 
                    threads_per_worker=2,
                    memory_limit='8GB')
    client = Client(cluster)
    main(client)
J.Ewa
  • 205
  • 3
  • 14