43

I just tried doing a countDistinct over a window and got this error:

AnalysisException: u'Distinct window functions are not supported: count(distinct color#1926)

Is there a way to do a distinct count over a window in pyspark?

Here's some example code:

from pyspark.sql.window import Window    
from pyspark.sql import functions as F

#function to calculate number of seconds from number of days
days = lambda i: i * 86400

df = spark.createDataFrame([(17, "2017-03-10T15:27:18+00:00", "orange"),
                    (13, "2017-03-15T12:27:18+00:00", "red"),
                    (25, "2017-03-18T11:27:18+00:00", "red")],
                    ["dollars", "timestampGMT", "color"])
                    
df = df.withColumn('timestampGMT', df.timestampGMT.cast('timestamp'))

#create window by casting timestamp to long (number of seconds)
w = (Window.orderBy(F.col("timestampGMT").cast('long')).rangeBetween(-days(7), 0))

df = df.withColumn('distinct_color_count_over_the_last_week', F.countDistinct("color").over(w))

df.show()

This is the output I'd like to see:

+-------+--------------------+------+---------------------------------------+
|dollars|        timestampGMT| color|distinct_color_count_over_the_last_week|
+-------+--------------------+------+---------------------------------------+
|     17|2017-03-10 15:27:...|orange|                                      1|
|     13|2017-03-15 12:27:...|   red|                                      2|
|     25|2017-03-18 11:27:...|   red|                                      1|
+-------+--------------------+------+---------------------------------------+
blackbishop
  • 30,945
  • 11
  • 55
  • 76
Bob Swain
  • 3,052
  • 3
  • 17
  • 28

2 Answers2

75

EDIT: as noleto mentions in his answer below, there is now approx_count_distinct available since PySpark 2.1 that works over a window.


Original answer - exact distinct count (not an approximation)

We can use a combination of size and collect_set to mimic the functionality of countDistinct over a window:

from pyspark.sql import functions as F, Window

# Function to calculate number of seconds from number of days
days = lambda i: i * 86400

# Create some test data
df = spark.createDataFrame([(17, "2017-03-10T15:27:18+00:00", "orange"),
                    (13, "2017-03-15T12:27:18+00:00", "red"),
                    (25, "2017-03-18T11:27:18+00:00", "red")],
                    ["dollars", "timestampGMT", "color"])
       
# Convert string timestamp to timestamp type             
df = df.withColumn('timestampGMT', df.timestampGMT.cast('timestamp'))

# Create window by casting timestamp to long (number of seconds)
w = Window.orderBy(F.col("timestampGMT").cast('long')).rangeBetween(-days(7), 0)

# Use collect_set and size functions to perform countDistinct over a window
df = df.withColumn('distinct_color_count_over_the_last_week', F.size(F.collect_set("color").over(w)))

df.show()

This results in the distinct count of color over the previous week of records:

+-------+--------------------+------+---------------------------------------+
|dollars|        timestampGMT| color|distinct_color_count_over_the_last_week|
+-------+--------------------+------+---------------------------------------+
|     17|2017-03-10 15:27:...|orange|                                      1|
|     13|2017-03-15 12:27:...|   red|                                      2|
|     25|2017-03-18 11:27:...|   red|                                      1|
+-------+--------------------+------+---------------------------------------+
ZygD
  • 22,092
  • 39
  • 79
  • 102
Bob Swain
  • 3,052
  • 3
  • 17
  • 28
  • What if your `countDistinct` was between multiple columns? `collect_set` can only take a single column name. – Jon Deaton Sep 24 '18 at 21:48
  • 1
    It's a bit of a work around, but one thing I've done is to just create a new column that is a concatenation of the two columns. Like if you've got a firstname column, and a lastname column, add a third column that is the two columns added together. Then you can use that one new column to do the collect_set. – Bob Swain Sep 25 '18 at 12:55
  • 4
    Interesting. The work-around that I have been using is to do a `groupBy` with `countDistinct` in the aggregation, followed by a `join` back to the original DataFrame that was grouped. I wonder which method is more efficient for large clusters? – Jon Deaton Sep 26 '18 at 03:33
  • I would think that adding a new column would use more RAM, especially if you're doing a lot of columns, or if the columns are large, but it wouldn't add too much computational complexity. – Bob Swain Sep 26 '18 at 14:02
  • I have notice performance issues when using orderBy, it brings all results back to driver. – Salman Ghauri Dec 02 '20 at 15:28
33

@Bob Swain's answer is nice and works! Since then, Spark version 2.1, Spark offers an equivalent to countDistinct function, approx_count_distinct which is more efficient to use and most importantly, supports counting distinct over a window.

Here goes the code to drop in replacement:

#approx_count_distinct supports a window
df = df.withColumn('distinct_color_count_over_the_last_week', F.approx_count_distinct("color").over(w))

For columns with small cardinalities, result is supposed to be the same as "countDistinct". When dataset grows a lot, you should consider adjusting the parameter rsd – maximum estimation error allowed, which allows you to tune the trade-off precision/performance.

noleto
  • 1,534
  • 16
  • 12
  • 3
    result is supposed to be the same as "countDistinct" - any guarantees about that? If I use a default rsd = 0.05 does this mean that for cardinality < 20 it will return correct result 100% of the time? – Kombajn zbożowy Mar 19 '20 at 11:46