2

tl;dr

I want to

dd.read_parquet('*.parq')['column'].nunique().compute()

but I get

WARNING - Worker exceeded 95% memory budget. Restarting

a couple of times before the workers get killed altogether.


Long version

I have a dataset with

  • 10 billion rows,
  • ~20 columns,

and a single machine with around 200GB memory. I am trying to use dask's LocalCluster to process the data, but my workers quickly exceed their memory budget and get killed even if I use a reasonably small subset and try using basic operations.

I have recreated a toy problem demonstrating the issue below.

Synthetic data

To approximate the problem above on a smaller scale, I will create a single column with 32-character ids with

  • a million unique ids
  • total length of 200 million rows
  • split into 100 parquet files

The result will be

  • 100 files, 66MB each, taking 178MB when loaded as a Pandas dataframe (estimated by df.memory_usage(deep=True).sum())
  • If loaded as a pandas dataframe, all the data take 20GB in memory
  • A single Series with all ids (which is what I assume the workers also have to keep in memory when computing nunique) takes about 90MB
import string
import os

import numpy as np
import pandas as pd

chars = string.ascii_letters + string.digits

n_total = int(2e8)
n_unique = int(1e6)

# Create random ids
ids = np.sum(np.random.choice(np.array(list(chars)).astype(object), size=[n_unique, 32]), axis=1)

outputdir = os.path.join('/tmp', 'testdata')
os.makedirs(outputdir, exist_ok=True)

# Sample from the ids to create 100 parquet files
for i in range(100):
    df = pd.DataFrame(np.random.choice(ids, n_total // 100), columns=['id'])
    df.to_parquet(os.path.join(outputdir, f'test-{str(i).zfill(3)}.snappy.parq'), compression='snappy')

Attempt at a solution

Let's assume that my machine only has 8GB of memory. Since the partitions take about 178MB and the result 90MB, according to Wes Kinney's rule of thumb, I might need up to 2-3Gb of memory. Therefore, either

  • n_workers=2, memory_limit='4GB', or
  • n_workers_1, memroy_limit='8GB'

seems like a good choice. Sadly, when I try it, I get

distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker

a couple of times, before the worker(s) get killed altogether.

import os
from dask.distributed import Client, LocalCluster
import dask.dataframe as dd

cluster = LocalCluster(n_workers=4, memory_limit='6GB')
client = Client(cluster)

dd.read_parquet(os.path.join('/tmp', 'testdata', '*.parq'))['id'].nunique().compute()

In fact, it seems, for example, with 4 workers, they each need 6GB of memory before being able to perform the task.

Can this situation be improved?

SultanOrazbayev
  • 14,900
  • 3
  • 16
  • 46
Dahn
  • 1,397
  • 1
  • 10
  • 29

1 Answers1

4

That's a great example of a recurring problem. The only shocking thing is that delayed was not used during the synthetic data creation:

import dask
@dask.delayed
def create_sample(i):
    df = pd.DataFrame(np.random.choice(ids, n_total // 100), columns=['id'])
    df.to_parquet(os.path.join(outputdir, f'test-{str(i).zfill(3)}.snappy.parq'), compression='snappy')    
    return

# Sample from the ids to create 100 parquet files
dels = [create_sample(i) for i in range(100)]
_ = dask.compute(dels)

For the following answer I will actually just use a small number of partitions (so change to range(5)), to have sane visualizations. Let's start with the loading:

df = dd.read_parquet(os.path.join('/tmp', 'testdata', '*.parq'), use_cols=['id'])
print(df.npartitions) # 5

This is a minor point, but having use_cols=['id'] in .read_parquet(), exploits the parquet advantage of columnar extraction (it might be that dask will do some optimization behind the scenes, but if you know the columns you want, there's no harm in being explicit).

Now, when you run df['id'].nunique(), here's the DAG that dask will compute:

enter image description here

With more partitions, there would be more steps, but it's apparent that there's a potential bottleneck when each partition is trying to send data that is quite large. This data can be very large for high-dimensional columns, so if each worker is trying to send a result that requires object that is 100MB, then the receiving worker will have to have 5 times the memory to accept the data (which could potentially decrease after further value-counting).

Additional consideration is how many tasks a single worker can run at a given time. The easiest way to control how many tasks can run at the same time on a given worker is resources. If you initiate the cluster with resources:

cluster = LocalCluster(n_workers=2, memory_limit='4GB', resources={'foo': 1})

Then every worker has the specified resources (in this case it's 1 unit of arbitrary foo), so if you think that processing a single partition should happen one at a time (due to high memory footprint), then you can do:

# note, no split_every is needed in this case since we're just 
# passing a single number
df['id'].nunique().compute(resources={'foo': 1})

This will ensure that any single worker is busy with 1 task at a time, preventing excessive memory usage. (side note: there's also .nunique_approx(), which may be of interest)

To control the amount of data that any given worker receives for further processing, one approach is to use split_every option. Here's what the DAG will look like with split_every=3:

enter image description here

You can see that now (for this number of partitions), the max memory that a worker will need is 3 times that max size of the dataset. So depending on your worker memory settings you might want to set split_every to a low value (2,3,4 or so).

In general, the more unique the variable, the more memory is needed for each partition's object with unique counts, and so a lower value of split_every is going to be useful to put a cap on the max memory usage. If the variable is not very unique, then each individual partition's unique count will be a small object, so there's no need to have a split_every restriction.

SultanOrazbayev
  • 14,900
  • 3
  • 16
  • 46
  • Thank you, this was very illuminating. Sadly even `split_every=2` does not seem to improve the situation though, workers are restarted and killed even when they have gigabytes of memory each. I have the same problem if I try `@dask.delayed` for the data synthesis, btw. – Dahn Mar 19 '21 at 13:30
  • Ah, sorry, what I wrote is actually about a different problem (I was thinking about `.value_counts()`. One sec, I will update the answer for the `.nunique()`. – SultanOrazbayev Mar 19 '21 at 13:50
  • Done, you might also find these useful: https://stackoverflow.com/a/65964643/10693596 and https://stackoverflow.com/a/66166430/10693596 – SultanOrazbayev Mar 19 '21 at 14:03
  • Thank you, I set the resources in both `LocalCluster` and `compute` to `n_workers*memory_limit` and I no longer run out of memory! The task stream now looks quite unhealthy (mostly white), but I suppose that's to be expected and hopefully possible to tweak a bit, too. Final question: you mentioned that `nunique` just sends a number of unique around. I would rather expect it to send a list of unique ids around, otherwise how would the other workers know how to combine the results? – Dahn Mar 19 '21 at 15:01
  • 1
    @Dahn, you are absolutely right. I will need to update the answer. :) – SultanOrazbayev Mar 19 '21 at 15:09
  • I think I may be misunderstanding the definition of resources. Is it that in `LocalCluster`, it's the resources available for a single worker, and in `compute` I am saying how much resources a single partition for that task requires? – Dahn Mar 19 '21 at 15:37
  • So if a worker has `foo`=10 (in cluster definition), and a task requires 1 `foo`, then a single worker can run 10 tasks at one time. – SultanOrazbayev Mar 19 '21 at 16:04
  • Just to be precise: you mean a single partition of the computation, not the computation as a whole, correct? – Dahn Mar 19 '21 at 16:04
  • By `task` I'm referring to an element of the overall DAG (not the complete DAG). – SultanOrazbayev Mar 19 '21 at 16:06
  • 1
    Okay, I will now try to review all this and the documentation and see if I got it all. Thank you for all the help! – Dahn Mar 19 '21 at 16:12