0

I am using dask for a project and I have managed to get it working by following some golden rules like

  • Use your native pandas code but wrap it around map_partitions
  • Index the columns you want to use for merge or group by
  • Partition your data such that the partition sizes are less than your worker memory
  • Avoid large graphs(I try to save intelligently like after setting an index)

However, I am trying to do a summary of everything that has worked for me so far with random data and even for data less than my total memory the workers are exceeding memory and restarting and this overflows to my computer which also runs out of memory.

import pandas as pd
from dask.distributed import Client, LocalCluster
import dask.dataframe as dd 
import numpy as np


def create_files():
    size = 1_000_000
    for i in range(100):
        df = pd.DataFrame({"col1": np.random.randint(90_000, 100_000, size), "col2": np.random.randint(101, 20_000, size), "col3": np.random.uniform(0, 10_000, size)})
        # Select appropriate partitions
        ddf = dd.from_pandas(df, npartitions=1)
        ddf.to_parquet(f"test", ignore_divisions=True, engine="fastparquet", overwrite= i==0, append= i>0)

    print("Created first file")
    for i in range(10):
        df = pd.DataFrame({"col1": np.random.randint(90_000, 100_000, size), "col4": np.random.uniform(0, 10_000, size)})
        # Select appropriate partitions
        ddf = dd.from_pandas(df, npartitions=1)
        ddf.to_parquet(f"test2", ignore_divisions=True, engine="fastparquet", overwrite= i== 0, append= i>0)
    print("Created second file")
    print("-------------------------------------------------------------")
def index_reparttion():
    print("About to repartition")
    ddf = dd.read_parquet("test")
    ddf = ddf.repartition(npartitions=100)
    ddf = ddf.set_index("col1")
    ddf = _rebalance_ddf(ddf)
    print("save parquet")
    ddf.to_parquet("test")
    

    ddf = dd.read_parquet("test2")
    ddf = ddf.repartition(npartitions=50)
    ddf = ddf.set_index("col1")
    ddf = _rebalance_ddf(ddf)
    print("save parquet 2")
    ddf.to_parquet("test2")
        
# https://stackoverflow.com/questions/52642966/repartition-dask-dataframe-to-get-even-partitions
def _rebalance_ddf(ddf):
    """Repartition dask dataframe to ensure that partitions are roughly equal size.

    Assumes `ddf.index` is already sorted.
    """
    if not ddf.known_divisions:  # e.g. for read_parquet(..., infer_divisions=False)
        ddf = ddf.reset_index().set_index(ddf.index.name, sorted=True)
    index_counts = ddf.map_partitions(lambda _df: _df.index.value_counts().sort_index()).compute()
    index = np.repeat(index_counts.index, index_counts.values)
    divisions, _ = dd.io.io.sorted_division_locations(index, npartitions=ddf.npartitions)
    return ddf.repartition(divisions=divisions)


def main():
    ddf = dd.read_parquet("test")
    print(ddf.compute())
    print(ddf.memory_usage_per_partition(index=True, deep=False).compute())
    print(ddf.memory_usage(deep=True).sum().compute())
    
    ddf2 = dd.read_parquet("test2")
    print(ddf2.memory_usage_per_partition(index=True, deep=False).compute())
    print(ddf2.memory_usage(deep=True).sum().compute())
    def mapped_fun(data):
        for lag in range(4):
            data[f"col_{lag}"] = data.groupby("col1")["col3"].transform(lambda x: x.shift(lag)).apply(lambda x: np.log(x))
        return data

    ddf = ddf.map_partitions(mapped_fun)
    
    ddf = ddf.merge(ddf2, on=['col1'], how="left")

    ddf.to_parquet("result", engine="fastparquet")


if __name__ == "__main__":
    cluster = LocalCluster(
                    n_workers=4, 
                    threads_per_worker=2,
                    memory_limit='auto'
                    )
    client = Client(cluster)
    create_files()
    index_reparttion()
    main()

How is this possible? I am really confused because the big dataset(ddf) is 2.4gb all of which can fit comfortably into memory of 16gb and each partition is about 23mb which is way less than the 4gb allocated for each worker. I read that if you have a lot repetition in the column used for merging pandas can generate large values(Merge large datasets with dask). I have created quite a large range for the index and the problem doesn't disappear.

J.Ewa
  • 205
  • 3
  • 14
  • Cross post: https://dask.discourse.group/t/dask-running-out-of-memory-even-when-partitions-are-way-less-than-worker-memory/1276 – mdurant Nov 02 '22 at 16:55
  • I saw my problem. I was merging on columns with many to many relationships and generating m x n relationship such data a partition of 2mb merged to another of 2mb becomes about 900mb. I identified this by running python profiler on the partition and processing with dask. @mdurant I'll fix the cross post. – J.Ewa Nov 02 '22 at 20:40

0 Answers0