2

Is there a clean way to compute moving percentiles on a Spark Dataframe.

I have a huge dataframe, I'm aggregating it every 15 minutes and I would like to compute percentiles on each portion.

df.groupBy(window(col("date").cast("timestamp"), "15 minutes"))
  .agg(sum("session"),mean("session"),percentile_approx("session", 0.5))
  .show()

error: not found: value percentile_approx

So I have to compute basic things like sum and average but I need to compute the median and some others percentiles.

Is there an efficient way to do this in Spark 2.1 ?

Because here, there is no median, percentile_approx, Percentile_approx functions implemented in the API it seems.

I saw this question has already been asked, but the answers weren't all agreeing to an unique solution. And it was quite fuzzy for me... So I wanted to know if in August 2017, there was a good and efficient solution.

And as I go through windows of 15 minutes, I'm wondering if just hard computing it wouldn't work rather than an approximation ?

Thanks a lot for your attention,

Have a good afternoon !

PS : Scala or PySpark I don't mind, both would be even greater !

tricky
  • 1,413
  • 1
  • 15
  • 26
  • what is `window` in your sample code? Do you want window function (and thus a sliding window) or non-overlapping windows (groupBy)? – Raphael Roth Aug 30 '17 at 18:31
  • Thanks for your answer and taking time with my question ! I have historical data and I want to aggregate it every 1 minute. Every minute, I have hundreds of records, and on each sliding window (for each minute) I need to compute the median and etc... So I was wondering what was a clean way to do it efficiently – tricky Aug 31 '17 at 09:21
  • Ok, but in this case the window is not really "sliding"... because for sliding window you would need window-functions. AFAIK sliding means in your case: for every record, take the "surrounding" 15min auf data and calculate aggregation – Raphael Roth Aug 31 '17 at 10:56
  • Yes that was a mistake from my explanations ! My bad and thanks for the clarification – tricky Aug 31 '17 at 12:06

2 Answers2

3

Ok so I was pretty dumb I guess.

I just had to add callUDF to my previous idea : percentile_approx. Sorry for the disagreement

callUDF("percentile_approx", col("session"), lit(0.5))

So for example in my case, I wanted to aggregate every minute a two months historic dataset :

df.groupBy(window((col("date")/1000).cast("timestamp"), "1 minutes"))
.agg(sum("session"),mean("session"),callUDF("percentile_approx", col("session"), lit(0.5)))
.show()

(timestamp in milisecond thus the /1000)

tricky
  • 1,413
  • 1
  • 15
  • 26
  • 1
    thats great, I did not know that you can use `percentile` as an aggregation function! – Raphael Roth Aug 31 '17 at 10:32
  • Just to clarify: percentile_approx (you could also just use percentile) is a built-in HIVE UDAF (https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-Built-inAggregateFunctions(UDAF))), so it's not implemented by spark but by hive (and you can only use it if you have hive-support (or hiveContext). – Raphael Roth Aug 31 '17 at 10:58
  • Also work as a window function `df.withColumn("moving_median", callUDF("percentile_approx", col("session"), lit(0.5)).over(window))` From Spark 2+, `HiveContext` is no longer required. https://stackoverflow.com/questions/39633614/calculate-quantile-on-grouped-data-in-spark-dataframe – belgacea Oct 11 '19 at 15:16
2

If you don't need sliding (overlapping) windows, you can do it with groupBy. AFAIK there is no percentile aggregation function, so you either need to implement your own UDAF or use the following approach:

val df = (1 to 100).map( i => (
  i/10, scala.util.Random.nextDouble)
  ).toDF("time","session")

val calcStats = udf((data:Seq[Double]) => {
  (data.sum,
   data.sum/data.size,
   data.sorted.apply(data.size/2) // is ~ median, replace with your desired logic
  )
})

df.groupBy($"time")
  .agg(collect_list($"session").as("sessions"))
  .withColumn("stats",calcStats($"sessions").cast("struct<sum:double,mean:double,median:double>"))
  .select($"time",$"stats.*")
  .orderBy($"time")
  .show

+----+------------------+-------------------+-------------------+
|time|               sum|               mean|             median|
+----+------------------+-------------------+-------------------+
|   0|3.5441618790222287| 0.3937957643358032| 0.3968893251191352|
|   1|3.6612518806543757| 0.3661251880654376| 0.4395039388994335|
|   2| 4.040992655970037|0.40409926559700365| 0.3522214051715915|
|   3| 4.583175830988081| 0.4583175830988081| 0.5800394949546751|
|   4| 3.849409207658501| 0.3849409207658501|0.43422232330495936|
|   5| 5.514681139649785| 0.5514681139649784| 0.6703416471647694|
|   6| 4.890227540935781| 0.4890227540935781| 0.5515164635420178|
|   7|4.1148083531280095|0.41148083531280094| 0.4384132796986667|
|   8| 5.723834881155167| 0.5723834881155166| 0.6415902834329499|
|   9| 5.559212938582014| 0.5559212938582014| 0.6816268800227596|
|  10|0.8867335786067405| 0.8867335786067405| 0.8867335786067405|
+----+------------------+-------------------+-------------------+
Raphael Roth
  • 26,751
  • 15
  • 88
  • 145