2

I am trying to groupBy and then calculate percentile on PySpark dataframe. I've tested the following piece of code according to this Stack Overflow post:

from pyspark.sql.types import FloatType
import pyspark.sql.functions as func
import numpy as np

qt_udf = func.udf(lambda x,qt: float(np.percentile(x,qt)), FloatType())
df_out = df_in.groupBy('Id').agg(func.collect_list('value').alias('data'))\
.withColumn('median', qt_udf(func.col('data'),func.lit(0.5)).cast("string"))  

df_out.show()

But get the following error:

Traceback (most recent call last): > df_out.show() ....> return lambda *a: f(*a) AttributeError: 'module' object has no attribute 'percentile'

This is because of numpy version (1.4.1), the percentile function was added from version 1.5. It is not possible to update numpy version in the short term.

ZygD
  • 22,092
  • 39
  • 79
  • 102
Marc S
  • 97
  • 2
  • 11

2 Answers2

1

Define a window and use the inbuilt percent_rank function to compute percentile values.

from pyspark.sql import Window
from pyspark.sql import functions as func
w = Window.partitionBy(df_in.Id).orderBy(df_in.value) #assuming default ascending order
df_out = df_in.withColumn('percentile_col',func.percent_rank().over(w))
Vamsi Prabhala
  • 48,685
  • 4
  • 36
  • 58
0

Question's title suggests that OP wanted to calculate percentiles. But the body shows that he needed to calculate median in groups.

Test dataset:

from pyspark.sql import SparkSession, functions as F, Window as W, Window
spark = SparkSession.builder.getOrCreate()
df_in = spark.createDataFrame(
    [('1', 10),
     ('1', 11),
     ('1', 12),
     ('1', 13),
     ('2', 20)],
    ['Id', 'value']
)

Percentiles of given data points in groups:

w = W.partitionBy('Id').orderBy('value')
df_in = df_in.withColumn('percentile_of_value_by_Id', F.percent_rank().over(w))

df_in.show()
#+---+-----+-------------------------+
#| Id|value|percentile_of_value_by_Id|
#+---+-----+-------------------------+
#|  1|   10|                      0.0|
#|  1|   11|       0.3333333333333333|
#|  1|   12|       0.6666666666666666|
#|  1|   13|                      1.0|
#|  2|   20|                      0.0|
#+---+-----+-------------------------+

Median (accurate and approximate):

df_out = (df_in.groupBy('Id').agg(
    F.expr('percentile(value, .5)').alias('median_accurate'),  # for small-mid dfs
    F.percentile_approx('value', .5).alias('median_approximate')  # for mid-large dfs
))

df_out.show()
#+---+---------------+------------------+
#| Id|median_accurate|median_approximate|
#+---+---------------+------------------+
#|  1|           11.5|                11|
#|  2|           20.0|                20|
#+---+---------------+------------------+
ZygD
  • 22,092
  • 39
  • 79
  • 102