43

The question is pretty much in the title: Is there an efficient way to count the distinct values in every column in a DataFrame?

The describe method provides only the count but not the distinct count, and I wonder if there is a a way to get the distinct count for all (or some selected) columns.

zero323
  • 322,348
  • 103
  • 959
  • 935
Rami
  • 8,044
  • 18
  • 66
  • 108

6 Answers6

75

In pySpark you could do something like this, using countDistinct():

from pyspark.sql.functions import col, countDistinct

df.agg(*(countDistinct(col(c)).alias(c) for c in df.columns))

Similarly in Scala :

import org.apache.spark.sql.functions.countDistinct
import org.apache.spark.sql.functions.col

df.select(df.columns.map(c => countDistinct(col(c)).alias(c)): _*)

If you want to speed things up at the potential loss of accuracy, you could also use approxCountDistinct().

mtoto
  • 23,919
  • 4
  • 58
  • 71
  • can you please explain what's the `*` for here (in your pyspark solution)? – Peybae Sep 05 '18 at 20:19
  • 4
    The star operator in Python can be used to unpack the arguments from the iterator for the function call, also see [here](https://docs.python.org/3/tutorial/controlflow.html#unpacking-argument-lists). – mtoto Sep 05 '18 at 20:43
42

Multiple aggregations would be quite expensive to compute. I suggest that you use approximation methods instead. In this case, approxating distinct count:

val df = Seq((1,3,4),(1,2,3),(2,3,4),(2,3,5)).toDF("col1","col2","col3")

val exprs = df.columns.map((_ -> "approx_count_distinct")).toMap
df.agg(exprs).show()
// +---------------------------+---------------------------+---------------------------+
// |approx_count_distinct(col1)|approx_count_distinct(col2)|approx_count_distinct(col3)|
// +---------------------------+---------------------------+---------------------------+
// |                          2|                          2|                          3|
// +---------------------------+---------------------------+---------------------------+

The approx_count_distinct method relies on HyperLogLog under the hood.

The HyperLogLog algorithm and its variant HyperLogLog++ (implemented in Spark) relies on the following clever observation.

If the numbers are spread uniformly across a range, then the count of distinct elements can be approximated from the largest number of leading zeros in the binary representation of the numbers.

For example, if we observe a number whose digits in binary form are of the form 0…(k times)…01…1, then we can estimate that there are in the order of 2^k elements in the set. This is a very crude estimate but it can be refined to great precision with a sketching algorithm.

A thorough explanation of the mechanics behind this algorithm can be found in the original paper.

Note: Starting Spark 1.6, when Spark calls SELECT SOME_AGG(DISTINCT foo)), SOME_AGG(DISTINCT bar)) FROM df each clause should trigger separate aggregation for each clause. Whereas this is different than SELECT SOME_AGG(foo), SOME_AGG(bar) FROM df where we aggregate once. Thus the performance won't be comparable when using a count(distinct(_)) and approxCountDistinct (or approx_count_distinct).

It's one of the changes of behavior since Spark 1.6 :

With the improved query planner for queries having distinct aggregations (SPARK-9241), the plan of a query having a single distinct aggregation has been changed to a more robust version. To switch back to the plan generated by Spark 1.5’s planner, please set spark.sql.specializeSingleDistinctAggPlanning to true. (SPARK-12077)

Reference : Approximate Algorithms in Apache Spark: HyperLogLog and Quantiles.

eliasah
  • 39,588
  • 11
  • 124
  • 154
  • Just a caveat: Note that for columns where almost every value is unique, approx_count_distinct might give up to 10% error in the default configuration and might actually take the same time as count_distinct. It might even return a value that is higher than the actual row count. – Thomas Mar 23 '18 at 11:04
  • That's correct but the bigger your dataset is, the lower that error is. – eliasah Mar 23 '18 at 15:42
  • how do you do this in PySpark? – Ross Brigoli Mar 11 '22 at 07:57
21

if you just want to count for particular column then following could help. Although its late answer. it might help someone. (pyspark 2.2.0 tested)

from pyspark.sql.functions import col, countDistinct
df.agg(countDistinct(col("colName")).alias("count")).show()
zero323
  • 322,348
  • 103
  • 959
  • 935
desaiankitb
  • 992
  • 10
  • 17
  • 3
    If you want the answer in a variable, rather than displayed to the user, replace the `.show()` with `.collect()[0][0]` – Sarah Messer Dec 01 '20 at 14:44
9

Adding to desaiankitb's answer, this would provide you a more intuitive answer :

from pyspark.sql.functions import count

df.groupBy(colname).count().show()
thegooner
  • 150
  • 2
  • 10
1

You can use the count(column name) function of SQL

Alternatively if you are using data analysis and want a rough estimation and not exact count of each and every column you can use approx_count_distinct function approx_count_distinct(expr[, relativeSD])

Jean-François Corbett
  • 37,420
  • 30
  • 139
  • 188
Cpt Kitkat
  • 1,392
  • 4
  • 31
  • 50
0

This is one way to create dataframe with every column counts :

> df = df.to_pandas_on_spark()
>         collect_df = []
>         for i in df.columns:
>             collect_df.append({"field_name": i , "unique_count": df[i].nunique()})
>         uniquedf = spark.createDataFrame(collect_df)

Output would like below. I used this with another dataframe to compare values if columns names are same.Other dataframe was also created way then joined.

df_prod_merged = uniquedf1.join(uniquedf2, on='field_name', how="left")

This is easy way to do it might be expensive on very huge data like 1 tb to process but still very efficient when used to_pandas_on_spark()

enter image description here

Jha Ayush
  • 67
  • 8