2

I am trying to do a percentile over a column using a Window function as below. I have referred here to use the ApproxQuantile definition over a group.

val df1 = Seq(
    (1, 10.0), (1, 20.0), (1, 40.6), (1, 15.6), (1, 17.6), (1, 25.6),
    (1, 39.6), (2, 20.5), (2 ,70.3), (2, 69.4), (2, 74.4), (2, 45.4),
    (3, 60.6), (3, 80.6), (4, 30.6), (4, 90.6)
).toDF("ID","Count")

val idBucketMapping = Seq((1, 4), (2, 3), (3, 2), (4, 2))
    .toDF("ID", "Bucket")

//jpp
import org.apache.spark.sql.Column
import org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile
import org.apache.spark.sql.expressions.Window

object PercentileApprox {
     def percentile_approx(col: Column, percentage: Column,
                             accuracy: Column): Column = {
         val expr = new ApproximatePercentile(
             col.expr, percentage.expr, accuracy.expr
         ).toAggregateExpression
         new Column(expr)
    }

    def percentile_approx(col: Column, percentage: Column): Column =
        percentile_approx(col, percentage,
                  lit(ApproximatePercentile.DEFAULT_PERCENTILE_ACCURACY))
}
import PercentileApprox._

var res = df1
    .withColumn("percentile",
        percentile_approx(col("count"), typedLit(doBucketing(2)))
                 .over(Window.partitionBy("ID"))
    )

def doBucketing(bucket_size : Int) = (1 until bucket_size)
    .scanLeft(0d)((a, _) => a + (1 / bucket_size.toDouble))
scala> df1.show
+---+-----+
| ID|Count|
+---+-----+
|  1| 10.0|
|  1| 20.0|
|  1| 40.6|
|  1| 15.6|
|  1| 17.6|
|  1| 25.6|
|  1| 39.6|
|  2| 20.5|
|  2| 70.3|
|  2| 69.4|
|  2| 74.4|
|  2| 45.4|
|  3| 60.6|
|  3| 80.6|
|  4| 30.6|
|  4| 90.6|
+---+-----+


scala> idBucketMapping.show
+---+------+
| ID|Bucket|
+---+------+
|  1|     4|
|  2|     3|
|  3|     2|
|  4|     2|
+---+------+


scala> res.show
+---+-----+------------------+
| ID|Count|        percentile|
+---+-----+------------------+
|  1| 10.0|[10.0, 20.0, 40.6]|
|  1| 20.0|[10.0, 20.0, 40.6]|
|  1| 40.6|[10.0, 20.0, 40.6]|
|  1| 15.6|[10.0, 20.0, 40.6]|
|  1| 17.6|[10.0, 20.0, 40.6]|
|  1| 25.6|[10.0, 20.0, 40.6]|
|  1| 39.6|[10.0, 20.0, 40.6]|
|  3| 60.6|[60.6, 60.6, 80.6]|
|  3| 80.6|[60.6, 60.6, 80.6]|
|  4| 30.6|[30.6, 30.6, 90.6]|
|  4| 90.6|[30.6, 30.6, 90.6]|
|  2| 20.5|[20.5, 69.4, 74.4]|
|  2| 70.3|[20.5, 69.4, 74.4]|
|  2| 69.4|[20.5, 69.4, 74.4]|
|  2| 74.4|[20.5, 69.4, 74.4]|
|  2| 45.4|[20.5, 69.4, 74.4]|
+---+-----+------------------+

Upto here it is well and good and the logic is simple. But I need results in a dynamic fashion. This means the argument doBucketing(2) to this function should be taken from idBucketMapping based on the ID - Value.

This seems to be little bit tricky for me. Is this possible by any means?

Expected Output -- This means the percentile bucket is based on - idBucketMapping Dataframe .

+---+-----+------------------------+
|ID |Count|percentile              |
+---+-----+------------------------+
|1  |10.0 |[10.0, 15.6, 20.0, 39.6]|
|1  |20.0 |[10.0, 15.6, 20.0, 39.6]|
|1  |40.6 |[10.0, 15.6, 20.0, 39.6]|
|1  |15.6 |[10.0, 15.6, 20.0, 39.6]|
|1  |17.6 |[10.0, 15.6, 20.0, 39.6]|
|1  |25.6 |[10.0, 15.6, 20.0, 39.6]|
|1  |39.6 |[10.0, 15.6, 20.0, 39.6]|
|3  |60.6 |[60.6, 60.6]            |
|3  |80.6 |[60.6, 60.6]            |
|4  |30.6 |[30.6, 30.6]            |
|4  |90.6 |[30.6, 30.6]            |
|2  |20.5 |[20.5, 45.4, 70.3]      |
|2  |70.3 |[20.5, 45.4, 70.3]      |
|2  |69.4 |[20.5, 45.4, 70.3]      |
|2  |74.4 |[20.5, 45.4, 70.3]      |
|2  |45.4 |[20.5, 45.4, 70.3]      |
+---+-----+------------------------+
Oli
  • 9,766
  • 5
  • 25
  • 46
abc_spark
  • 383
  • 3
  • 19

2 Answers2

1

I have a solution for you that is extremely unelegant and works only if you have a limited number of possible bucketing.

My first version is very ugly.

// for the sake of clarity, let's define a function that generates the
// window aggregation
def per(x : Int) = percentile_approx(col("count"), typedLit(doBucketing(x)))
                        .over(Window.partitionBy("ID"))

// then, we simply try to match the Bucket column with a possible value
val res = df1
    .join(idBucketMapping, Seq("ID"))
    .withColumn("percentile", when('Bucket === 2, per(2)
                     .otherwise(when('Bucket === 3, per(3))
                     .otherwise(per(4)))
    )

That's nasty but it works in your case. Slightly less ugly but very same logic, you can define a set of possible numbers of buckets and use it to do the same thing as above.

val possible_number_of_buckets = 2 to 5

val res = df1
    .join(idBucketMapping, Seq("ID"))
    .withColumn("percentile", possible_number_of_buckets
                .tail
                .foldLeft(per(possible_number_of_buckets.head))
                         ((column, size) => when('Bucket === size, per(size))
                                              .otherwise(column)))
Oli
  • 9,766
  • 5
  • 25
  • 46
0

percentile_approx takes percentage and accuracy. It seems, they both must be a constant literal. Thus we can't compute the percentile_approx at runtime with dynamically calculated percentage and accuracy.

ref- apache spark git percentile_approx source

Som
  • 6,193
  • 1
  • 11
  • 22
  • Yes it is. But is'nt it be possible to do this per a group bucket(which is constant over a window) – abc_spark Jun 05 '20 at 10:19
  • if you divide the dataframe based on the `ID` filter and pass the `percentage` computed using the `bucket` for that `ID` to the new dataframe then you can, but that is again a static way. I think dynamically you can't since `percentile_approx` is not taking a percentage as column. Anyway you can try :) – Som Jun 05 '20 at 10:27
  • I did not get that clearly. Isnt the percentage is expected to be a column ? If,yes, cant it be a constant per group ? – abc_spark Jun 05 '20 at 16:47
  • Can you please give a try about your understanding? It would be good if we solve this query – Som Jun 05 '20 at 17:16