tl;dr
I want to
dd.read_parquet('*.parq')['column'].nunique().compute()
but I get
WARNING - Worker exceeded 95% memory budget. Restarting
a couple of times before the workers get killed altogether.
Long version
I have a dataset with
- 10 billion rows,
- ~20 columns,
and a single machine with around 200GB memory. I am trying to use dask's LocalCluster
to process the data, but my workers quickly exceed their memory budget and get killed even if I use a reasonably small subset and try using basic operations.
I have recreated a toy problem demonstrating the issue below.
Synthetic data
To approximate the problem above on a smaller scale, I will create a single column with 32-character ids with
- a million unique ids
- total length of 200 million rows
- split into 100
parquet
files
The result will be
- 100 files,
66MB
each, taking178MB
when loaded as a Pandas dataframe (estimated bydf.memory_usage(deep=True).sum()
) - If loaded as a pandas dataframe, all the data take
20GB
in memory - A single Series with all ids (which is what I assume the workers also have to keep in memory when computing
nunique
) takes about90MB
import string
import os
import numpy as np
import pandas as pd
chars = string.ascii_letters + string.digits
n_total = int(2e8)
n_unique = int(1e6)
# Create random ids
ids = np.sum(np.random.choice(np.array(list(chars)).astype(object), size=[n_unique, 32]), axis=1)
outputdir = os.path.join('/tmp', 'testdata')
os.makedirs(outputdir, exist_ok=True)
# Sample from the ids to create 100 parquet files
for i in range(100):
df = pd.DataFrame(np.random.choice(ids, n_total // 100), columns=['id'])
df.to_parquet(os.path.join(outputdir, f'test-{str(i).zfill(3)}.snappy.parq'), compression='snappy')
Attempt at a solution
Let's assume that my machine only has 8GB
of memory. Since the partitions take about 178MB
and the result 90MB
, according to Wes Kinney's rule of thumb, I might need up to 2-3Gb of memory. Therefore, either
n_workers=2, memory_limit='4GB'
, orn_workers_1, memroy_limit='8GB'
seems like a good choice. Sadly, when I try it, I get
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
a couple of times, before the worker(s) get killed altogether.
import os
from dask.distributed import Client, LocalCluster
import dask.dataframe as dd
cluster = LocalCluster(n_workers=4, memory_limit='6GB')
client = Client(cluster)
dd.read_parquet(os.path.join('/tmp', 'testdata', '*.parq'))['id'].nunique().compute()
In fact, it seems, for example, with 4
workers, they each need 6GB
of memory before being able to perform the task.
Can this situation be improved?