79

I would like to calculate group quantiles on a Spark dataframe (using PySpark). Either an approximate or exact result would be fine. I prefer a solution that I can use within the context of groupBy / agg, so that I can mix it with other PySpark aggregate functions. If this is not possible for some reason, a different approach would be fine as well.

This question is related but does not indicate how to use approxQuantile as an aggregate function.

I also have access to the percentile_approx Hive UDF but I don't know how to use it as an aggregate function.

For the sake of specificity, suppose I have the following dataframe:

from pyspark import SparkContext
import pyspark.sql.functions as f

sc = SparkContext()    

df = sc.parallelize([
    ['A', 1],
    ['A', 2],
    ['A', 3],
    ['B', 4],
    ['B', 5],
    ['B', 6],
]).toDF(('grp', 'val'))

df_grp = df.groupBy('grp').agg(f.magic_percentile('val', 0.5).alias('med_val'))
df_grp.show()

Expected result:

+----+-------+
| grp|med_val|
+----+-------+
|   A|      2|
|   B|      5|
+----+-------+
ZygD
  • 22,092
  • 39
  • 79
  • 102
abeboparebop
  • 7,396
  • 6
  • 37
  • 46
  • I think you might be able to roll your own in this instance using the underlying rdd and an algorithm for computing distributed quantiles e.g. [here](https://dataorigami.net/blogs/napkin-folding/19055451-percentile-and-quantile-estimation-of-big-data-the-t-digest) and links therein. In fact, the github they link to has some pyspark examples. – ags29 Oct 20 '17 at 11:22
  • [How to use approxQuantile by group?](https://stackoverflow.com/q/53548964/6910411) – zero323 Jan 14 '19 at 00:15

8 Answers8

152

I guess you don't need it anymore. But will leave it here for future generations (i.e. me next week when I forget).

from pyspark.sql import Window
import pyspark.sql.functions as F

grp_window = Window.partitionBy('grp')
magic_percentile = F.expr('percentile_approx(val, 0.5)')

df.withColumn('med_val', magic_percentile.over(grp_window))

Or to address exactly your question, this also works:

df.groupBy('grp').agg(magic_percentile.alias('med_val'))

And as a bonus, you can pass an array of percentiles:

quantiles = F.expr('percentile_approx(val, array(0.25, 0.5, 0.75))')

And you'll get a list in return.

Chris A.
  • 6,817
  • 2
  • 25
  • 43
kael
  • 1,703
  • 1
  • 11
  • 8
  • Very clean answer. Do you know how can it be done using Pandas UDF (a.k.a. Vectorized UDFs) too? – ciurlaro Nov 06 '19 at 16:18
  • @CesareIurlaro, I've only wrapped it in a UDF. Never tried with a Pandas one – kael Nov 07 '19 at 14:14
  • Would you mind to try? Performace really should shine there: https://databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html – ciurlaro Nov 10 '19 at 09:48
  • 16
    With Spark 3.1.0 it is now possible to use `percentile_approx` directly in PySpark groupby aggregations: `df.groupBy("key").agg(percentile_approx("value", 0.5, lit(1000000)).alias("median"))` https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.percentile_approx.html – 00schneider Mar 04 '21 at 09:58
  • @kael or @chris-a can we get 00schneider's comment added to this answer? This is the simplest & best solution now that it is built in to `pyspark.sql.functions` – Aaron Robeson Mar 15 '21 at 14:49
  • what is the difference between using `expr('percentile(val, 0.5)')` vs `expr('percentile_approx(val, 0.5)')` ? – thentangler Jul 11 '21 at 14:37
  • 2
    @thentangler: the former is an exact percentile, which is not a scalable operation for large datasets, and the latter is approximate but scalable. – abeboparebop Sep 08 '21 at 18:51
16

Since you have access to percentile_approx, one simple solution would be to use it in a SQL command:

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

df.registerTempTable("df")
df2 = sqlContext.sql("select grp, percentile_approx(val, 0.5) as med_val from df group by grp")
desertnaut
  • 57,590
  • 26
  • 140
  • 166
Shaido
  • 27,497
  • 23
  • 70
  • 73
  • 1
    This works, but I prefer a solution that I can use within `groupBy` / `agg` at the PySpark level (so that I can easily mix it with other PySpark aggregate functions). – abeboparebop Oct 20 '17 at 09:32
  • @abeboparebop I do not beleive it's possible to only use `groupBy` and `agg`, however, to use a window-based approach should also work. – Shaido Oct 20 '17 at 09:44
  • 3
    I have clarified my ideal solution in the question. Clearly this answer does the job, but it's not quite what I want. I'll leave the question open for some time to see if a cleaner answer comes up. – abeboparebop Oct 20 '17 at 09:51
15

(UPDATE: now it is possible, see accepted answer above)


Unfortunately, and to the best of my knowledge, it seems that it is not possible to do this with "pure" PySpark commands (the solution by Shaido provides a workaround with SQL), and the reason is very elementary: in contrast with other aggregate functions, such as mean, approxQuantile does not return a Column type, but a list.

Let's see a quick example with your sample data:

spark.version
# u'2.2.0'

import pyspark.sql.functions as func
from pyspark.sql import DataFrameStatFunctions as statFunc

# aggregate with mean works OK:
df_grp_mean = df.groupBy('grp').agg(func.mean(df['val']).alias('mean_val'))
df_grp_mean.show()
# +---+--------+ 
# |grp|mean_val|
# +---+--------+
# |  B|     5.0|
# |  A|     2.0|
# +---+--------+

# try aggregating by median:
df_grp_med = df.groupBy('grp').agg(statFunc(df).approxQuantile('val', [0.5], 0.1))
# AssertionError: all exprs should be Column

# mean aggregation is a Column, but median is a list:

type(func.mean(df['val']))
# pyspark.sql.column.Column

type(statFunc(df).approxQuantile('val', [0.5], 0.1))
# list

I doubt that a window-based approach will make any difference, since as I said the underlying reason is a very elementary one.

See also my answer here for some more details.

desertnaut
  • 57,590
  • 26
  • 140
  • 166
9

It seems to be completely solved by pyspark >= 3.1.0 using percentile_approx

import pyspark.sql.functions as func    

df.groupBy("grp").agg(func.percentile_approx("val", 0.5).alias("median"))

For further information see: https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.functions.percentile_approx.html

Jan_ewazz
  • 361
  • 4
  • 3
  • 1
    From version 3.4+ (and also already in 3.3.1) the median function is directly available https://github.com/apache/spark/blob/e170a2eb236a376b036730b5d63371e753f1d947/python/pyspark/sql/functions.py#L633 – Jan_ewazz Dec 20 '22 at 09:05
8

The most simple way to do this with pyspark==2.4.5 is:

df \
    .groupby('grp') \
    .agg(expr('percentile(val, array(0.5))')[0].alias('p50')) \
    .show()

output:

|grp|p50|
+---+---+
|  B|5.0|
|  A|2.0|
+---+---+
Kxrr
  • 506
  • 6
  • 14
0

problem of "percentile_approx(val, 0.5)": if e.g. range is [1,2,3,4] this function returns 2 (as median) the function below returns 2.5:

import statistics

median_udf = F.udf(lambda x: statistics.median(x) if bool(x) else None, DoubleType())

... .groupBy('something').agg(median_udf(F.collect_list(F.col('value'))).alias('median'))
eid
  • 537
  • 5
  • 12
0

Probably all above answers may not give the right answer when there are even number of entries in the group. To make it general and make it work in those case, an averaging of the 50th percentile and the next value would be the best.

df_grp = df.groupBy('grp').agg(
  F.percentile_approx('val', 0.5).alias('med_val'),
  ((F.percentile_approx('val', 0.5)+ F.percentile_approx('val', 0.500000000001))*.5).alias('med_val2')
  )
df_grp.show()

In the above code, med_val2 gives you the correct median even when there is even number of entries in the group. The number 0.500000000001 is just chosen it is slightly above 0.5 and so that it works for even large datasets.

prashanth
  • 4,197
  • 4
  • 25
  • 42
0

Spark 3.4+ has median (exact median) which can be accessed directly in PySpark:

F.median('val')

With your example dataframe:

df.groupBy('grp').agg(F.median('val')).show()
# +---+-----------+
# |grp|median(val)|
# +---+-----------+
# |  A|        2.0|
# |  B|        5.0|
# +---+-----------+

Other quantiles (quartiles, percentiles, etc.) can be calculated using percentile or percentile_approx

  • Approximate values for given percent_rank (percentile) values

    df = df.groupBy('grp').agg(
        F.expr('percentile_approx(val, .25)').alias('lower_quartile_approx'),
        F.expr('percentile_approx(val, .75)').alias('upper_quartile_approx'),
        F.expr('percentile_approx(val, array(.25, .5, .75))').alias('all_quartiles_approx'),
        F.expr('percentile_approx(val, .9)').alias('90th_percentile_approx'),
    )
    df.show()
    # +---+---------------------+---------------------+--------------------+----------------------+
    # |grp|lower_quartile_approx|upper_quartile_approx|all_quartiles_approx|90th_percentile_approx|
    # +---+---------------------+---------------------+--------------------+----------------------+
    # |  A|                    1|                    3|           [1, 2, 3]|                     3|
    # |  B|                    4|                    6|           [4, 5, 6]|                     6|
    # +---+---------------------+---------------------+--------------------+----------------------+
    
  • Accurate values for given percent_rank (percentile) values:

    df = df.groupBy('grp').agg(
        F.expr('percentile(val, .25)').alias('lower_quartile_acc'),
        F.expr('percentile(val, .75)').alias('upper_quartile_acc'),
        F.expr('percentile(val, array(.25, .5, .75))').alias('all_quartiles_acc'),
        F.expr('percentile(val, .9)').alias('90th_percentile_acc'),
    )
    df.show()
    # +---+------------------+------------------+-----------------+-------------------+
    # |grp|lower_quartile_acc|upper_quartile_acc|all_quartiles_acc|90th_percentile_acc|
    # +---+------------------+------------------+-----------------+-------------------+
    # |  A|               1.5|               2.5|  [1.5, 2.0, 2.5]| 2.8000000000000003|
    # |  B|               4.5|               5.5|  [4.5, 5.0, 5.5]|  5.800000000000001|
    # +---+------------------+------------------+-----------------+-------------------+
    
ZygD
  • 22,092
  • 39
  • 79
  • 102