0

I'm using spark-sql-2.4.1v, and I'm trying to do find quantiles, i.e. percentile 0, percentile 25, etc, on each column of my given data. As I am doing multiple percentiles, how to retrieve each calculated percentile from the results?

My dataframe df:

+----+---------+-------------+----------+-----------+-------+
|  id|     date|      revenue|con_dist_1| con_dist_2| zone  |
+----+---------+-------------+----------+-----------+-------+
|  10|1/15/2018|  0.010680705|        10|0.019875458|  east |
|  10|1/15/2018|  0.006628853|         4|0.816039063|  west |
|  10|1/15/2018|   0.01378215|        20|0.082049528|  east |
|  10|1/15/2018|  0.010680705|         6|0.019875458|  west |
|  10|1/15/2018|  0.006628853|        30|0.816039063|  east |
+----+---------+-------------+----------+-----------+-------+

The final dataframe should be something as below i.e. for each zone :

+---+---------+-----------+-------+-------------+-----------+-----------+
| id|     date|    revenue|  zone | perctile_col| quantile_0|quantile_10|
+---+---------+-----------+-------+-------------+-----------+-----------+
| 10|1/15/2018|0.010680705|  east |  con_dist_1 |       10.0|       30.0|
| 10|1/15/2018|0.010680705|  east |  con_dist_2 |0.019875458|0.816039063|
| 10|1/15/2018|0.010680705|  west |  con_dist_1 |        4.0|        6.0|
| 10|1/15/2018|0.010680705|  west |  con_dist_2 |0.019875458|0.816039063|
+---+---------+-----------+-------+-------------+-----------+-----------+

Is there any way to use partitionBy and the approxQuantile function? Will this is processed using repartition("zone"), i.e., not collecting the dataset for each zone?

Shaido
  • 27,497
  • 23
  • 70
  • 73
BdEngineer
  • 2,929
  • 4
  • 49
  • 85
  • How do you plan to handle the revenue column here (since the result only have 2 columns)? Based on the text, is the expected result something alike to this the result here: https://stackoverflow.com/questions/60561513/how-to-include-map-calculated-percentiles-to-the-result-dataframe/60574300#60574300 but with twice as many rows (since there are 2 zones)? – Shaido Mar 13 '20 at 01:47

1 Answers1

1

approxQuantile is not very suitable here as it does not allow grouping. However, this problem can instead be solved using percentile_approx and Spark window functions (groupBy would also be possible here, which one to use depends on the wanted dataframe format). First we do some setup:

val df = Seq(
    (10, "1/15/2018", 0.010680705, 10,0.019875458, "east"),
    (10, "1/15/2018", 0.006628853,  4,0.816039063, "west"),
    (10, "1/15/2018", 0.01378215,  20,0.082049528, "east"),
    (10, "1/15/2018", 0.010680705,  6,0.019875458, "west"),
    (10, "1/15/2018", 0.006628853, 30,0.816039063, "east"))     
  .toDF("id", "date", "revenue", "con_dist_1", "con_dist_2", "zone")


val percentiles = Seq(0.1, 1.0)  // Which percentiles to calculate
val cols = Seq("con_dist_1", "con_dist_2")  // The columns to use

To calculate the percentiles for each zone group can be done as follows:

val window = Window.partitionBy("zone")
val percentile_func = (col: String) => expr(s"percentile_approx(${col}, array(${percentiles.mkString(",")}))")
val df2 = cols.foldLeft(df){case (df, c) => df.withColumn(c, percentile_func(c).over(window))}

The results will be like this:

+---+---------+-----------+----------+--------------------------+----+
|id |date     |revenue    |con_dist_1|con_dist_2                |zone|
+---+---------+-----------+----------+--------------------------+----+
|10 |1/15/2018|0.006628853|[4, 6]    |[0.019875458, 0.816039063]|west|
|10 |1/15/2018|0.010680705|[4, 6]    |[0.019875458, 0.816039063]|west|
|10 |1/15/2018|0.010680705|[10, 30]  |[0.019875458, 0.816039063]|east|
|10 |1/15/2018|0.01378215 |[10, 30]  |[0.019875458, 0.816039063]|east|
|10 |1/15/2018|0.006628853|[10, 30]  |[0.019875458, 0.816039063]|east|
+---+---------+-----------+----------+--------------------------+----+

Next, we want to convert the dataframe to the correct format. This is a slight adaption to the answer here: How to include/map calculated percentiles to the result dataframe?.

cols.map{ case c =>
  percentiles
    .zipWithIndex
    .foldLeft(df2.withColumn("perctile_col", lit(c))){ case (df2, (perc, index)) => 
      df2.withColumn(s"qunantile_${perc}", col(c).getItem(index))
    }
  }
  .reduce(_.union(_))
  .drop(cols: _*) // these are not needed anymore

Final dataframe:

+---+---------+-----------+----+------------+-------------+-------------+
| id|     date|    revenue|zone|perctile_col|qunantile_0.1|qunantile_1.0|
+---+---------+-----------+----+------------+-------------+-------------+
| 10|1/15/2018|0.006628853|west|  con_dist_1|          4.0|          6.0|
| 10|1/15/2018|0.010680705|west|  con_dist_1|          4.0|          6.0|
| 10|1/15/2018|0.010680705|east|  con_dist_1|         10.0|         30.0|
| 10|1/15/2018| 0.01378215|east|  con_dist_1|         10.0|         30.0|
| 10|1/15/2018|0.006628853|east|  con_dist_1|         10.0|         30.0|
| 10|1/15/2018|0.006628853|west|  con_dist_2|  0.019875458|  0.816039063|
| 10|1/15/2018|0.010680705|west|  con_dist_2|  0.019875458|  0.816039063|
| 10|1/15/2018|0.010680705|east|  con_dist_2|  0.019875458|  0.816039063|
| 10|1/15/2018| 0.01378215|east|  con_dist_2|  0.019875458|  0.816039063|
| 10|1/15/2018|0.006628853|east|  con_dist_2|  0.019875458|  0.816039063|
+---+---------+-----------+----+------------+-------------+-------------+
Shaido
  • 27,497
  • 23
  • 70
  • 73
  • 1
    @BdEngineer: I should be able to take a closer look in a while. But from a quick glace, it would help if you added the expected output from the two dataframes in the question. – Shaido Mar 25 '20 at 02:42
  • 1
    @BdEngineer: On a second note, did this answer help you solve the question here? :) – Shaido Mar 25 '20 at 03:39
  • 1
    @BdEngineer: The `foldLeft` part is actually just multiple `withColumn` but with different columns as input so it should not be too slow (and it's done directly on the executors). – Shaido Mar 25 '20 at 07:42
  • @BdEngineer: `withColumn` will just add information to the dataframe (not like select). The input to `foldLeft` is `df2.withColumn("perctile_col", lit(c))` which is a dataframe with all relevant columns. – Shaido Mar 26 '20 at 01:33
  • @BdEngineer: So that's why the excel result looks different. It should be possible to solve with a window function (for grouping and sorting) and `row_number`. The two appropriate rows can then be selected and then you can take care of the fractional part afterwards. – Shaido Apr 09 '20 at 01:07
  • 1
    @BdEngineer: Could you add the data and all addition info (should it be done for each group or whole dataset / multiple percentiles / etc) to a new question? It's a bit too much to answer in comments. – Shaido Apr 09 '20 at 08:14