2

This is question based on answer from Sumit from below link

[Spark SQL: apply aggregate functions to a list of columns

Here are the details

val Claim1 = StructType(Seq(StructField("pid", StringType, true),StructField("diag1", StringType, 
true),StructField("diag2", StringType, true), StructField("allowed", IntegerType, true), 
StructField("allowed1", IntegerType, true)))

val claimsData1 = Seq(("PID1", "diag1", "diag2", 100, 200), ("PID1", "diag2", "diag3", 300, 600), 
("PID1", "diag1", "diag5", 340, 680), ("PID2", "diag3", "diag4", 245, 490), ("PID2", "diag2", 
"diag1", 124, 248))

val claimRDD1 = sc.parallelize(claimsData1)
val claimRDDRow1 = claimRDD1.map(p => Row(p._1, p._2, p._3, p._4, p._5))
val claimRDD2DF1 = sqlContext.createDataFrame(claimRDDRow1, Claim1)
val exprs = Map("allowed" -> "sum", "allowed1" -> "avg")
claimRDD2DF1.groupBy("pid").agg(exprs) show false

But it doesn't provide alias for naming the new column, I have a dataframe where I need to perform multiple aggregation for set of columns,it can be sum,avg,min,max on multiple sets of columns, So please let me know if there is a way to resolve the above issue or any better way to achieve this?

Thanks in advance.

Babu
  • 861
  • 3
  • 13
  • 36

2 Answers2

2

Your code works with a slight modification, the trick is to call callUDF which takes the aggregation function as a String and can be aliased :

val exprs = Map("allowed" -> "sum", "allowed1" -> "avg")

val aggExpr = exprs.map{case (k,v)  => callUDF(v,col(k)).as(k)}.toList

claimRDD2DF1.groupBy("pid").agg(aggExpr.head,aggExpr.tail:_*)
  .show()

Or if you can specify your aggregation as a function object, you don't need to use callUDF:

val aggExpr = Seq(
  ("allowed",sum(_:Column)),
  ("allowed1", avg(_:Column))
)
  .map{case (k,v)  => v(col(k)).as(k)}


claimRDD2DF1.groupBy("pid").agg(aggExpr.head,aggExpr.tail:_*)
  .show()

both versions giving

+----+-------+-----------------+
| pid|allowed|         allowed1|
+----+-------+-----------------+
|PID1|    740|493.3333333333333|
|PID2|    369|            369.0|
+----+-------+-----------------+
Raphael Roth
  • 26,751
  • 15
  • 88
  • 145
1

You can define a list of agg functions with alias as below and use them

import org.apache.spark.sql.functions._

//You should at least know list of columns for particular function   
val colsToSum = claimRDD2DF1.columns.filter(_.startsWith("a"))
val colsToAvg = List("allowed", "allowed1")

//define functions and its alias for list of columns 
val sumList = colsToSum.map(name => sum(name).as(name + "_sum"))
val avgList = colsToAvg.map(name => avg(name).as(name + "_avg"))

//get a final list of functions
val exp = sumList  ++ avgList

//Apply list functions in single groupBy 
claimRDD2DF1.groupBy("pid").agg(exp.head, exp.tail: _*).show(false)

This will give you

+----+-----------+------------+------------------+-----------------+
|pid |allowed_sum|allowed1_sum|allowed_avg       |allowed1_avg     |
+----+-----------+------------+------------------+-----------------+
|PID1|740        |1480        |246.66666666666666|493.3333333333333|
|PID2|369        |738         |184.5             |369.0            |
+----+-----------+------------+------------------+-----------------+
koiralo
  • 22,594
  • 6
  • 51
  • 72
  • Thanks Shankar, I am trying to figure out is this the best way to do this or any other way which it can be done ,bcs I have thousands of columns , seems bit heavy ,looking for easier way to define. – Babu Dec 16 '19 at 14:20
  • I think defining functions would be moreover same, if you have multiple functions.What you can do is create a list of columns(as claimRDD2DF1.columns) for particular functions and and create a final list. – koiralo Dec 16 '19 at 14:23
  • Yeah, I am looking for something precise,like the solutions given in the link I shared, but it was single aggregation for multiple columns. – Babu Dec 16 '19 at 14:27
  • Thank you ,it is better. – Babu Dec 16 '19 at 14:58