-1

I'm using spark-sql-2.4.1v, and I'm trying to do find quantiles, i.e. percentile 0, percentile 25, etc, on each column of my given data.

As I am doing multiple percentiles, how to retrieve each calculated percentile from the results?

My dataframe df:

+----+---------+-------------+----------+-----------+
|  id|     date|      revenue|con_dist_1| con_dist_2|
+----+---------+-------------+----------+-----------+
|  10|1/15/2018|  0.010680705|         6|0.019875458|
|  10|1/15/2018|  0.006628853|         4|0.816039063|
|  10|1/15/2018|   0.01378215|         4|0.082049528|
|  10|1/15/2018|  0.010680705|         6|0.019875458|
|  10|1/15/2018|  0.006628853|         4|0.816039063|
+----+---------+-------------+----------+-----------+

I need to get expected output/result as below:

+----+---------+-------------+-------------+------------+-------------+
|  id|     date|      revenue| perctile_col| quantile_0 |quantile_10  |
+----+---------+-------------+-------------+------------+-------------+
|  10|1/15/2018|  0.010680705| con_dist_1  |<quant0_val>|<quant10_val>|
|  10|1/15/2018|  0.010680705| con_dist_2  |<quant0_val>|<quant10_val>|
|  10|1/15/2018|  0.006628853| con_dist_1  |<quant0_val>|<quant10_val>|
|  10|1/15/2018|  0.006628853| con_dist_2  |<quant0_val>|<quant10_val>|
|  10|1/15/2018|   0.01378215| con_dist_1  |<quant0_val>|<quant10_val>|
|  10|1/15/2018|   0.01378215| con_dist_2  |<quant0_val>|<quant10_val>|
|  10|1/15/2018|  0.010680705| con_dist_1  |<quant0_val>|<quant10_val>|
|  10|1/15/2018|  0.010680705| con_dist_2  |<quant0_val>|<quant10_val>|
|  10|1/15/2018|  0.006628853| con_dist_1  |<quant0_val>|<quant10_val>|
|  10|1/15/2018|  0.006628853| con_dist_2  |<quant0_val>|<quant10_val>|
+----+---------+-------------+-------------+------------+-------------+

I have already calculated the quantiles like this but need to add them to the output dataframe:

val col_list = Array("con_dist_1","con_dist_2")
val quantiles = df.stat.approxQuantile(col_list, Array(0.0,0.1,0.5),0.0)

val percentile_0 = 0;
val percentile_10 = 1;

val Q0 = quantiles(col_list.indexOf("con_dist_1"))(percentile_0)
val Q10 =quantiles(col_list.indexOf("con_dist_1"))(percentile_10)

How to get expected output show above?

Shaido
  • 27,497
  • 23
  • 70
  • 73
BdEngineer
  • 2,929
  • 4
  • 49
  • 85
  • `quant0_val` and `quant10_val` are constant values ? – eliasah Mar 06 '20 at 10:19
  • 1
    I wrote a function to calculate this in one go for multiple columns yet I ran out of memory several times because the windowing has to reshuffle for each variable. What worked at the end was to take them separately and join them after. – kael Mar 26 '20 at 08:44
  • @Shaido - Reinstate Monica, hi monica , how r u ?i have a use case like this , any advice please https://stackoverflow.com/questions/63137437/doing-multiple-column-value-look-up-after-joining-with-lookup-dataset – BdEngineer Jul 28 '20 at 15:18

1 Answers1

2

An easy solution would be to create multiple dataframes, one for each "con_dist" column, and then use union to merge them together. This can easily be done using a map over col_list as follows:

val col_list = Array("con_dist_1", "con_dist_2")
val quantiles = df.stat.approxQuantile(col_list, Array(0.0,0.1,0.5), 0.0)

val df2 = df.drop(col_list: _*) // we don't need these columns anymore

val result = col_list
  .zipWithIndex
  .map{case (col, colIndex) => 
    val Q0 = quantiles(colIndex)(percentile_0)
    val Q10 = quantiles(colIndex)(percentile_10)

    df2.withColumn("perctile_col", lit(col))
      .withColumn("quantile_0", lit(Q0))
      .withColumn("quantile_10", lit(Q10))
  }.reduce(_.union(_))

The final dataframe will then be:

+---+---------+-----------+------------+-----------+-----------+
| id|     date|    revenue|perctile_col| quantile_0|quantile_10|
+---+---------+-----------+------------+-----------+-----------+
| 10|1/15/2018|0.010680705|  con_dist_1|        4.0|        4.0|
| 10|1/15/2018|0.006628853|  con_dist_1|        4.0|        4.0|
| 10|1/15/2018| 0.01378215|  con_dist_1|        4.0|        4.0|
| 10|1/15/2018|0.010680705|  con_dist_1|        4.0|        4.0|
| 10|1/15/2018|0.006628853|  con_dist_1|        4.0|        4.0|
| 10|1/15/2018|0.010680705|  con_dist_2|0.019875458|0.019875458|
| 10|1/15/2018|0.006628853|  con_dist_2|0.019875458|0.019875458|
| 10|1/15/2018| 0.01378215|  con_dist_2|0.019875458|0.019875458|
| 10|1/15/2018|0.010680705|  con_dist_2|0.019875458|0.019875458|
| 10|1/15/2018|0.006628853|  con_dist_2|0.019875458|0.019875458|
+---+---------+-----------+------------+-----------+-----------+
Shaido
  • 27,497
  • 23
  • 70
  • 73
  • 1
    @BdEngineer: I'm not sure exactly where and how you want to include this group by. But generally, grouping by a range can easily be done by first creating a new column with a unique identifier (e.g., a column called range with unique values for each group), see for example here: https://stackoverflow.com/questions/48657771/sum-up-into-a-new-column-from-a-range-of-values-of-a-column-in-spark-using-scala – Shaido Mar 10 '20 at 01:16