1

I want to calculate percentiles using groupBy in Spark Scala and I followed the answer here: How to use approxQuantile by group? . I'm using the Spark < 3.1 solution.

Here is my wrapper function:

// A wrapper function that can be used in conjunction with groupBy
def percentile_approx(col: Column, percentage: Column, accuracy: Column): Column = {
  val expr = new ApproximatePercentile(
    col.expr,  percentage.expr, accuracy.expr
  ).toAggregateExpression
  new Column(expr)
}

I use the wrapper function to calculate percentiles:

def calPercentileBoundary(df: DataFrame, percentileMin: Float, percentileMax: Float, percentileInterval: Float, percentileAccuracy: Int): DataFrame = {
  df.groupBy("col1", "col2")
    .agg(percentile_approx(col("scores"),
      typedLit(percentileMin to percentileMax by percentileInterval),
      lit(percentileAccuracy))as "boundaries")
}

I have some transformation on my original data, then assigned to val df.

Then I call the above function:

val dfBoundaries = calPercentileBoundary(df, 0f, 1f, 0.01f , 10000)

I write the dataframe afterwards. I executed the Spark job twice, one time it took only4 mins, another time it took 5 times as long. It gets worse if the data is large. I'm wondering what is causing the difference in the execution time. Do I need to shuffle or repartition? Also, I have 500 executors and each with 2 executor cores. Is this affecting the job?

Would changing to using sql directly make it faster? like this: https://stackoverflow.com/a/46846174/5125692

koiralo
  • 22,594
  • 6
  • 51
  • 72
al3xtouch
  • 491
  • 4
  • 19

0 Answers0