0

I have the below dataframe .

scala> df.show
+---+------+---+
|  M|Amount| Id|
+---+------+---+
|  1|     5|  1|
|  1|    10|  2|
|  1|    15|  3|
|  1|    20|  4|
|  1|    25|  5|
|  1|    30|  6|
|  2|     2|  1|
|  2|     4|  2|
|  2|     6|  3|
|  2|     8|  4|
|  2|    10|  5|
|  2|    12|  6|
|  3|     1|  1|
|  3|     2|  2|
|  3|     3|  3|
|  3|     4|  4|
|  3|     5|  5|
|  3|     6|  6|
+---+------+---+

created by

val df=Seq( (1,5,1), (1,10,2), (1,15,3), (1,20,4), (1,25,5), (1,30,6), (2,2,1), (2,4,2), (2,6,3), (2,8,4), (2,10,5), (2,12,6), (3,1,1), (3,2,2), (3,3,3), (3,4,4), (3,5,5), (3,6,6) ).toDF("M","Amount","Id")

Here I have a base column M and is ranked as ID based on Amount. I am trying to compute the percentile keeping M as a group but for every last three values of Amount.

I am Using the below code to find the percentile for a group. But how can I target the last three values. ?

 df.withColumn("percentile",percentile_approx(col("Amount") ,lit(.5)) over Window.partitionBy("M"))

Expected Output

+---+------+---+-----------------------------------+
|  M|Amount| Id| percentile                        |
+---+------+---+-----------------------------------+
|  1|     5|  1| percentile(Amount) whose (Id-1)   |
|  1|    10|  2| percentile(Amount) whose (Id-1,2) |
|  1|    15|  3| percentile(Amount) whose (Id-1,3) |
|  1|    20|  4| percentile(Amount) whose (Id-2,4) |
|  1|    25|  5| percentile(Amount) whose (Id-3,5) |
|  1|    30|  6| percentile(Amount) whose (Id-4,6) |
|  2|     2|  1| percentile(Amount) whose (Id-1)   |
|  2|     4|  2| percentile(Amount) whose (Id-1,2) |
|  2|     6|  3| percentile(Amount) whose (Id-1,3) |
|  2|     8|  4| percentile(Amount) whose (Id-2,4) |
|  2|    10|  5| percentile(Amount) whose (Id-3,5) |
|  2|    12|  6| percentile(Amount) whose (Id-4,6) |
|  3|     1|  1| percentile(Amount) whose (Id-1)   |
|  3|     2|  2| percentile(Amount) whose (Id-1,2) |
|  3|     3|  3| percentile(Amount) whose (Id-1,3) |
|  3|     4|  4| percentile(Amount) whose (Id-2,4) |
|  3|     5|  5| percentile(Amount) whose (Id-3,5) |
|  3|     6|  6| percentile(Amount) whose (Id-4,6) |
+---+------+---+----------------------------------+

This seems to be little bit tricky to me as I am still learning spark.Expecting answers from enthusiasts here.

abc_spark
  • 383
  • 3
  • 19

1 Answers1

2

Adding orderBy("Amount") and rowsBetween(-2,0) to the Window definition gets the required result:

  • orderBy sorts the rows within each group by Amount
  • rowsBetween takes only the current row and the two rows before into account when calculating the percentile
val w = Window.partitionBy("M").orderBy("Amount").rowsBetween(-2,0)

df.withColumn("percentile",PercentileApprox.percentile_approx(col("Amount") ,lit(.5))
      .over(w))
  .orderBy("M", "Amount") //not really required, just to make the output more readable
  .show()

prints

+---+------+---+----------+
|  M|Amount| Id|percentile|
+---+------+---+----------+
|  1|     5|  1|         5|
|  1|    10|  2|         5|
|  1|    15|  3|        10|
|  1|    20|  4|        15|
|  1|    25|  5|        20|
|  1|    30|  6|        25|
|  2|     2|  1|         2|
|  2|     4|  2|         2|
|  2|     6|  3|         4|
|  2|     8|  4|         6|
|  2|    10|  5|         8|
|  2|    12|  6|        10|
|  3|     1|  1|         1|
|  3|     2|  2|         1|
|  3|     3|  3|         2|
|  3|     4|  4|         3|
|  3|     5|  5|         4|
|  3|     6|  6|         5|
+---+------+---+----------+
werner
  • 13,518
  • 6
  • 30
  • 45