I want to calculate percentiles using groupBy in Spark Scala and I followed the answer here: How to use approxQuantile by group? . I'm using the Spark < 3.1 solution.
Here is my wrapper function:
// A wrapper function that can be used in conjunction with groupBy
def percentile_approx(col: Column, percentage: Column, accuracy: Column): Column = {
val expr = new ApproximatePercentile(
col.expr, percentage.expr, accuracy.expr
).toAggregateExpression
new Column(expr)
}
I use the wrapper function to calculate percentiles:
def calPercentileBoundary(df: DataFrame, percentileMin: Float, percentileMax: Float, percentileInterval: Float, percentileAccuracy: Int): DataFrame = {
df.groupBy("col1", "col2")
.agg(percentile_approx(col("scores"),
typedLit(percentileMin to percentileMax by percentileInterval),
lit(percentileAccuracy))as "boundaries")
}
I have some transformation on my original data, then assigned to val df
.
Then I call the above function:
val dfBoundaries = calPercentileBoundary(df, 0f, 1f, 0.01f , 10000)
I write the dataframe afterwards. I executed the Spark job twice, one time it took only4 mins, another time it took 5 times as long. It gets worse if the data is large. I'm wondering what is causing the difference in the execution time. Do I need to shuffle or repartition? Also, I have 500 executors and each with 2 executor cores. Is this affecting the job?
Would changing to using sql directly make it faster? like this: https://stackoverflow.com/a/46846174/5125692