11

I am trying to calculate percentile of a column in a DataFrame? I cant find any percentile_approx function in Spark aggregation functions.

For e.g. in Hive we have percentile_approx and we can use it in the following way

hiveContext.sql("select percentile_approx("Open_Rate",0.10) from myTable); 

But I want to do it using Spark DataFrame for performance reasons.

Sample data set

|User ID|Open_Rate|
------------------- 
|A1     |10.3     |
|B1     |4.04     |
|C1     |21.7     |
|D1     |18.6     |

I want to find out how many users fall into 10 percentile or 20 percentile and so on. I want to do something like this

df.select($"id",Percentile($"Open_Rate",0.1)).show
ZygD
  • 22,092
  • 39
  • 79
  • 102
dheee
  • 1,588
  • 3
  • 15
  • 25

3 Answers3

10

Since Spark2.0, things are getting easier,simply use this function in DataFrameStatFunctions like :

df.stat.approxQuantile("Open_Rate",Array(0.25,0.50,0.75),0.0)

There are also some useful statistic functions for DataFrame in DataFrameStatFunctions.

Yulin GUO
  • 161
  • 2
  • 3
  • 1
    Good find. I like that it allows to define the relative error as number between 0 and 1. One caveat: this function will not work for aggregated calculations on multiple groups concurrently. For those who are looking to calculate percentile on multiple groups at once, have a look at `percentile_approx` which is a spark sql function. It will take an Integer optional argument which is related to the number of observations per group: by default 10,000. This means this function returns the exact percentile for groups with less than 10,000 observations. Specify a larger value for more precision. – Raphvanns Jan 23 '18 at 18:52
  • what do we need to import for df.stat.approxQuantile ? – Haha TTpro Dec 12 '18 at 10:42
  • @HahaTTpro,nothing more. With any DataFrame instance, you can use dataframeInstance.stat.approxQuantile. – Yulin GUO Dec 14 '18 at 02:26
4

SparkSQL and the Scala dataframe/dataset APIs are executed by the same engine. Equivalent operations will generate equivalent execution plans. You can see the execution plans with explain.

sql(...).explain
df.explain

When it comes to your specific question, it is a common pattern to intermix SparkSQL and Scala DSL syntax because, as you have discovered, their capabilities are not yet equivalent. (Another example is the difference between SQL's explode() and DSL's explode(), the latter being more powerful but also more inefficient due to marshalling.)

The simple way to do it is as follows:

df.registerTempTable("tmp_tbl")
val newDF = sql(/* do something with tmp_tbl */)
// Continue using newDF with Scala DSL

What you need to keep in mind if you go with the simple way is that temporary table names are cluster-global (up to 1.6.x). Therefore, you should use randomized table names if the code may run simultaneously more than once on the same cluster.

On my team the pattern is common-enough that we have added a .sql() implicit to DataFrame which automatically registers and then unregisters a temp table for the scope of the SQL statement.

Sim
  • 13,147
  • 9
  • 66
  • 95
  • you are right. And thanks for the detailed explanation. But the reason i wanted to do this in dataframes is because i have few other methods and UDF's which i need to apply. And if i am going to use plain sql/hive, i will have to change lot of code to achieve this percentile. Also not sure what other issues might occur if i change the code. – dheee Jun 14 '16 at 18:22
  • @dheee I'm not sure I understand your concern... (1) You can use your own UDF from both SQL and the DSL. (2) You don't need to change all your code, just the stage that produces the column with the percentile. – Sim Jun 16 '16 at 02:22
1

I created a bebe library that makes it easy to calculate the percentile of a column.

Let's start by creating your DataFrame.

val df = spark
  .createDF(
    List(
      ("A1", 10.3),
      ("B1", 4.04),
      ("C1", 21.7),
      ("D1", 18.6)
    ),
    List(
      ("User ID", StringType, true),
      ("Open_Rate", DoubleType, true)
    )
  )
df.show()
+-------+---------+
|User ID|Open_Rate|
+-------+---------+
|     A1|     10.3|
|     B1|     4.04|
|     C1|     21.7|
|     D1|     18.6|
+-------+---------+

Now let's calculate the 10th percentile:

val resDF = df.agg(bebe_percentile(col("Open_Rate"), lit(0.1)).as("10_percentile"))
resDF.show()
+-----------------+
|    10_percentile|
+-----------------+
|5.918000000000001|
+-----------------+

It uses the same underlying code as the SQL percentile method.

Powers
  • 18,150
  • 10
  • 103
  • 108