1

I am using spark-sql-2.4.1v with Java 8. I need to calculate percentiles such as 25,75,90 for some given data.

Given source dataset:

 val df = Seq(
      (10, 20, 30, 40, 50),
      (100, 200, 300, 400, 500),
      (111, 222, 333, 444, 555),
      (1123, 2123, 3123, 4123, 5123),
      (1321, 2321, 3321, 4321, 5321)
    ).toDF("col_1", "col_2", "col_3", "col_4", "col_5")
   
+-----+-----+-----+-----+-----+
|col_1|col_2|col_3|col_4|col_5|
+-----+-----+-----+-----+-----+
|10   |20   |30   |40   |50   |
|100  |200  |300  |400  |500  |
|111  |222  |333  |444  |555  |
|1123 |2123 |3123 |4123 |5123 |
|1321 |2321 |3321 |4321 |5321 |
+-----+-----+-----+-----+-----+

val columnsToCalculate = Seq("col_2","col_3","col_4")

Expected output :

+------+-----+-----+
|column|count|mean |
+------+-----+-----+
|col_2 |  5 |some-z|
|col_3 |  5 |some-y|
|col_4 |  5 |some-x|
+------+-----+-----+
BdEngineer
  • 2,929
  • 4
  • 49
  • 85

2 Answers2

2

Good question. I solved this but may be lacking in skills here. I think there is a fold solution, but I present a data wrangling approach. fold in Scala cannot be executed in parallel, so this approach should be faster.

Also, I do this in Scala, but this How to pivot Spark DataFrame? may help you to convert.

I am interested in better solutions. The dynamic columns list presents some issues but I continued in that vein and got to this solution:

import org.apache.spark.sql.functions._ 
// Add any other imports.

// Gen data. 
val df = Seq(
      (10, 20, 30, 40, 50),
      (100, 200, 300, 400, 500),
      (111, 222, 333, 444, 555),
      (1123, 2123, 3123, 4123, 5123),
      (1321, 2321, 3321, 4321, 5321)
    ).toDF("col_1", "col_2", "col_3", "col_4", "col_5")

// List approach of data to apply aggregates against.
val columnsToCalculate = Seq("col_2","col_3","col_4")

// Apply your aggregate and indicate what the metric is - individually. Could not get multiple calc with the .map approach here. Expand accordingly.
val df1 = df.select(columnsToCalculate.map(c => mean(col(c)).alias(c)): _*).withColumn("stat", lit("mean"))
val df2 = df.select(columnsToCalculate.map(c => min(col(c)).alias(c)): _*).withColumn("stat", lit("min"))
val df3 = df1.union(df2)

// Data wrangling, make an array for exploding.
val df4 = df3.withColumn("CombinedArray", array(columnsToCalculate.map{ colName => regexp_replace(regexp_replace(df1(colName),"(^)",s"$colName: "),"(-)",s", $colName: ")}:_*))
val df5 = df4.select($"stat", explode($"CombinedArray"))
val df6 = df5.withColumn("split", split(col("col"), ":")).select($"stat", col("split")(0).as("col_name"), col("split")(1).as("metric_value"))

// Final data wrangling.
val res = df6.groupBy($"col_name")
             .pivot($"stat")
             .agg(first($"metric_value"))
             .orderBy($"col_name")
res.show(false)

returns:

+--------+-------+-----+
|col_name|mean   |min  |
+--------+-------+-----+
|col_2   | 977.2 | 20.0|
|col_3   | 1421.4| 30.0|
|col_4   | 1865.6| 40.0|
+--------+-------+-----+

BTW: I could not place your count aspect.

Note: As the other answer states, may be you just wanted a describe?

thebluephantom
  • 16,458
  • 8
  • 40
  • 83
  • thanks a lot , i have another use case like this , any advice please https://stackoverflow.com/questions/63137437/doing-multiple-column-value-look-up-after-joining-with-lookup-dataset – BdEngineer Jul 28 '20 at 15:15
  • Hi I am thinking how better to do – thebluephantom Jul 28 '20 at 15:27
  • can you advice me on the same https://stackoverflow.com/questions/63450135/applying-when-condition-only-when-column-exists-in-the-dataframe – BdEngineer Aug 17 '20 at 11:51
  • can you tell me what is wrong with this broadcast variable accessing ? https://stackoverflow.com/questions/64003697/spark-broadcast-variable-map-giving-null-value – BdEngineer Sep 22 '20 at 05:47
1

there is a summary() api inside dataset which computes basicStats in the below format-

    ds.summary("count", "min", "25%", "75%", "max").show()
   
    // output:
    // summary age   height
    // count   10.0  10.0
    // min     18.0  163.0
    // 25%     24.0  176.0
    // 75%     32.0  180.0
    // max     92.0  192.0

Similarly, You can enrich the dataframe apis to get the stats in the format you required as below-

Define RichDataframe & implicits to use

 import org.apache.spark.sql.functions.expr
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.types.{NumericType, StringType, StructField, StructType}

import scala.language.implicitConversions

class RichDataFrame(ds: DataFrame) {
  def statSummary(statistics: String*): DataFrame = {
    val defaultStatistics = Seq("max", "min", "mean", "std", "skewness", "kurtosis")
    val statFunctions = if (statistics.nonEmpty) statistics else defaultStatistics
    val selectedCols = ds.schema
      .filter(a => a.dataType.isInstanceOf[NumericType] || a.dataType.isInstanceOf[StringType])
      .map(_.name)

    val percentiles = statFunctions.filter(a => a.endsWith("%")).map { p =>
      try {
        p.stripSuffix("%").toDouble / 100.0
      } catch {
        case e: NumberFormatException =>
          throw new IllegalArgumentException(s"Unable to parse $p as a percentile", e)
      }
    }
    require(percentiles.forall(p => p >= 0 && p <= 1), "Percentiles must be in the range [0, 1]")
    val aggExprs = selectedCols.flatMap(c => {
      var percentileIndex = 0
      statFunctions.map { stats =>
        if (stats.endsWith("%")) {
          val index = percentileIndex
          percentileIndex += 1
          expr(s"cast(percentile_approx($c, array(${percentiles.mkString(", ")}))[$index] as string)")
        } else {
          expr(s"cast($stats($c) as string)")
        }
      }
    })

    val aggResult = ds.select(aggExprs: _*).head()

    val r = aggResult.toSeq.grouped(statFunctions.length).toArray
      .zip(selectedCols)
      .map{case(seq, column) => column +: seq }
      .map(Row.fromSeq)

    val output = StructField("columns", StringType) +: statFunctions.map(c => StructField(c, StringType))

    val spark = ds.sparkSession
    spark.createDataFrame(spark.sparkContext.parallelize(r), StructType(output))
  }
}

object RichDataFrame {

  trait Enrichment {
    implicit def enrichMetadata(ds: DataFrame): RichDataFrame =
      new RichDataFrame(ds)
  }

  object implicits extends Enrichment

}

Test with the provided test data as below

     val df = Seq(
      (10, 20, 30, 40, 50),
      (100, 200, 300, 400, 500),
      (111, 222, 333, 444, 555),
      (1123, 2123, 3123, 4123, 5123),
      (1321, 2321, 3321, 4321, 5321)
    ).toDF("col_1", "col_2", "col_3", "col_4", "col_5")

    val columnsToCalculate = Seq("col_2","col_3","col_4")

    import com.som.spark.shared.RichDataFrame.implicits._
    df.selectExpr(columnsToCalculate: _*)
      .statSummary("mean", "count", "25%", "75%", "90%")
      .show(false)

    /**
      * +-------+------+-----+---+----+----+
      * |columns|mean  |count|25%|75% |90% |
      * +-------+------+-----+---+----+----+
      * |col_2  |977.2 |5    |200|2123|2321|
      * |col_3  |1421.4|5    |300|3123|3321|
      * |col_4  |1865.6|5    |400|4123|4321|
      * +-------+------+-----+---+----+----+
      */

Som
  • 6,193
  • 1
  • 11
  • 22
  • 1
    did not interprete this way, then there is also describe. you may well be correct – thebluephantom Jul 28 '20 at 20:52
  • @Someshwar Kale, why here percentile_approx used , why not percentile ? i am looking for percentile value instead – BdEngineer Jul 29 '20 at 05:13
  • @BdEngineer, if you wanted to use `percentile` then just replace `percentile_approx` with `percentile` in the `statSummary` method. the reason of using `percentile_approx` is because its faster than the `percentile`. More info- https://databricks.com/blog/2016/05/19/approximate-algorithms-in-apache-spark-hyperloglog-and-quantiles.html – Som Jul 29 '20 at 06:06
  • @Someshwar Kale thanks a lot , but when i tried df.stats.percentile it says it wont recognize "percentile" function ... but when do it on spark.sql( "select percentile(col("x"),0.0) as "pecentile_0") its working , confused ...so what is the different 2 ) what is the reason of declaring a RichDataframe in the above solution ? – BdEngineer Jul 29 '20 at 08:36
  • 1
    `RichDataframe` has implicits to enrich current dataframe apis. when you specify `df.statSummary("25%")`, it call the method defined in RichDataframe. In that method "25%" uses `percentile_approx` (check this line `expr(s"cast(percentile_approx($c, array(${percentiles.mkString(", ")}))[$index] as string)")`). Now if you wanted to use `percentile` instead of `percentile_approx` just change the this -`expr(s"cast(percentile_approx($c, array(${percentiles.mkString(", ")}))[$index] as string)")` to `expr(s"cast(percentile($c, array(${percentiles.mkString(", ")}))[$index] as string)")`. – Som Jul 29 '20 at 08:52