I am using dask for a project and I have managed to get it working by following some golden rules like
- Use your native pandas code but wrap it around map_partitions
- Index the columns you want to use for merge or group by
- Partition your data such that the partition sizes are less than your worker memory
- Avoid large graphs(I try to save intelligently like after setting an index)
However, I am trying to do a summary of everything that has worked for me so far with random data and even for data less than my total memory the workers are exceeding memory and restarting and this overflows to my computer which also runs out of memory.
import pandas as pd
from dask.distributed import Client, LocalCluster
import dask.dataframe as dd
import numpy as np
def create_files():
size = 1_000_000
for i in range(100):
df = pd.DataFrame({"col1": np.random.randint(90_000, 100_000, size), "col2": np.random.randint(101, 20_000, size), "col3": np.random.uniform(0, 10_000, size)})
# Select appropriate partitions
ddf = dd.from_pandas(df, npartitions=1)
ddf.to_parquet(f"test", ignore_divisions=True, engine="fastparquet", overwrite= i==0, append= i>0)
print("Created first file")
for i in range(10):
df = pd.DataFrame({"col1": np.random.randint(90_000, 100_000, size), "col4": np.random.uniform(0, 10_000, size)})
# Select appropriate partitions
ddf = dd.from_pandas(df, npartitions=1)
ddf.to_parquet(f"test2", ignore_divisions=True, engine="fastparquet", overwrite= i== 0, append= i>0)
print("Created second file")
print("-------------------------------------------------------------")
def index_reparttion():
print("About to repartition")
ddf = dd.read_parquet("test")
ddf = ddf.repartition(npartitions=100)
ddf = ddf.set_index("col1")
ddf = _rebalance_ddf(ddf)
print("save parquet")
ddf.to_parquet("test")
ddf = dd.read_parquet("test2")
ddf = ddf.repartition(npartitions=50)
ddf = ddf.set_index("col1")
ddf = _rebalance_ddf(ddf)
print("save parquet 2")
ddf.to_parquet("test2")
# https://stackoverflow.com/questions/52642966/repartition-dask-dataframe-to-get-even-partitions
def _rebalance_ddf(ddf):
"""Repartition dask dataframe to ensure that partitions are roughly equal size.
Assumes `ddf.index` is already sorted.
"""
if not ddf.known_divisions: # e.g. for read_parquet(..., infer_divisions=False)
ddf = ddf.reset_index().set_index(ddf.index.name, sorted=True)
index_counts = ddf.map_partitions(lambda _df: _df.index.value_counts().sort_index()).compute()
index = np.repeat(index_counts.index, index_counts.values)
divisions, _ = dd.io.io.sorted_division_locations(index, npartitions=ddf.npartitions)
return ddf.repartition(divisions=divisions)
def main():
ddf = dd.read_parquet("test")
print(ddf.compute())
print(ddf.memory_usage_per_partition(index=True, deep=False).compute())
print(ddf.memory_usage(deep=True).sum().compute())
ddf2 = dd.read_parquet("test2")
print(ddf2.memory_usage_per_partition(index=True, deep=False).compute())
print(ddf2.memory_usage(deep=True).sum().compute())
def mapped_fun(data):
for lag in range(4):
data[f"col_{lag}"] = data.groupby("col1")["col3"].transform(lambda x: x.shift(lag)).apply(lambda x: np.log(x))
return data
ddf = ddf.map_partitions(mapped_fun)
ddf = ddf.merge(ddf2, on=['col1'], how="left")
ddf.to_parquet("result", engine="fastparquet")
if __name__ == "__main__":
cluster = LocalCluster(
n_workers=4,
threads_per_worker=2,
memory_limit='auto'
)
client = Client(cluster)
create_files()
index_reparttion()
main()
How is this possible? I am really confused because the big dataset(ddf) is 2.4gb all of which can fit comfortably into memory of 16gb and each partition is about 23mb which is way less than the 4gb allocated for each worker. I read that if you have a lot repetition in the column used for merging pandas can generate large values(Merge large datasets with dask). I have created quite a large range for the index and the problem doesn't disappear.