0

I have a Dask Dataframe read from csv file having around 1 million records and 120 features/columns and I would like to count the number of unique value in each column. I can clearly do it for each column separately using the for-loop:

from dask import dataframe as dd
dask_df = dd.read_csv("train.csv")
for column in dask_df.columns:
     print(dask_df[col].nunique().compute())

But compute at each iteration is very expensive ( took me around ~40minutes with 3 node cluster with 5 workers with each worker having 2GB of memory and 2 vcores), so is there a way where I can get the unique values at each column of data-frame? I have tried dask_df.describe() api but that gives unique values only for String types. Any help appreciated, thanks in advance!

3 Answers3

2

Here's another workaround, where the number of unique values for each column are all calculated at once, allowing for more opportunity for optimization:

import random
import pandas
import dask
import dask.dataframe as dd

df = pandas.DataFrame({
    "x": [random.randint(0,100) for _ in range(100)],
    "y": [random.randint(0,100) for _ in range(100)],
    "z": [random.randint(0,100) for _ in range(100)],
})
ddf = dd.from_pandas(df, npartitions=10)

unique = {
    name: ddf[name].nunique()
    for name in ddf.columns
}

# traverse=True is default, but being explicit that we are traversing the dict for dask objects
dask.compute(unique, traverse=True)
scj13
  • 306
  • 1
  • 5
0

I don't know if that is the fastest solution, but you can use .melt() to unpivot your dataframe columns and then use .groupby() on the variable column to count the unique values in each group in order to get significant performance improvement over your column per column solution:

dd.read_csv('test.csv').melt().groupby('variable')['value'].nunique().compute()

Let us generate some random integer data and save as csv:

import numpy as np
import pandas as pd
from dask import dataframe as dd

nrows = 10000
ncols = 120

rng = np.random.default_rng(seed=1)
random_data = rng.integers(low=0, high=nrows/2, size=(nrows,ncols))
pd.DataFrame(data=random_data).add_prefix('col_').to_csv('test.csv', index=False)

We use the following two functions for performance evaluation:

def nunique_per_column():
    dask_df = dd.read_csv('test.csv')
    counts = []
    for col in dask_df.columns:
          counts.append(dask_df[col].nunique().compute())
    return pd.Series(counts, index=dask_df.columns)

def melt_groupby_nunique():
    return dd.read_csv('test.csv').melt().groupby('variable')['value'].nunique().compute()

First check if both functions compute the same result with:

pd.testing.assert_series_equal(nunique_per_column().sort_index(),
                               melt_groupby_nunique().sort_index(),
                               check_names=False)

%timeit on the functions and the sample data yields the following output on my machine:

%timeit nunique_per_column()
17.5 s ± 216 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

%timeit melt_groupby_nunique()
1.78 s ± 576 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
joAschauer
  • 106
  • 7
  • When I tried running using melt(), seems like workers consumes a lot of memory but there is a slight improvement at performance, what about using dask_df.compute().nunique() as this is faster than all of them (took ~30 seconds) but compute() will require entire dataframe to fit in memory? – Mohamed Niyaz Sirajudeen Nov 14 '21 at 15:05
  • if your dataframe fits into memory you can use pd.DataFrame.nunique() after you called compute(). However `nunique()` is not available for dask dataframes and you have use groupby. Also have a look at [this question](https://stackoverflow.com/q/45922884/14333932). – joAschauer Nov 15 '21 at 13:31
0

@Mohamed as of Dask version 2022.01.0, dask.DataFrame.nunique() has been implemented:

import random
import pandas
import dask.dataframe as dd

df = pandas.DataFrame({
    "x": [random.randint(0,100) for _ in range(100)],
    "y": [random.randint(0,100) for _ in range(100)],
    "z": [random.randint(0,100) for _ in range(100)],
})
ddf = dd.from_pandas(df, npartitions=10)

ddf.nunique().compute()
scj13
  • 306
  • 1
  • 5