43

I have three Arrays of string type containing following information:

  • groupBy array: containing names of the columns I want to group my data by.
  • aggregate array: containing names of columns I want to aggregate.
  • operations array: containing the aggregate operations I want to perform

I am trying to use spark data frames to achieve this. Spark data frames provide an agg() where you can pass a Map [String,String] (of column name and respective aggregate operation ) as input, however I want to perform different aggregation operations on the same column of the data. Any suggestions on how to achieve this?

Community
  • 1
  • 1
Richa Banker
  • 503
  • 1
  • 4
  • 10

6 Answers6

71

Scala:

You can for example map over a list of functions with a defined mapping from name to function:

import org.apache.spark.sql.functions.{col, min, max, mean}
import org.apache.spark.sql.Column

val df = Seq((1L, 3.0), (1L, 3.0), (2L, -5.0)).toDF("k", "v")
val mapping: Map[String, Column => Column] = Map(
  "min" -> min, "max" -> max, "mean" -> avg)

val groupBy = Seq("k")
val aggregate = Seq("v")
val operations = Seq("min", "max", "mean")
val exprs = aggregate.flatMap(c => operations .map(f => mapping(f)(col(c))))

df.groupBy(groupBy.map(col): _*).agg(exprs.head, exprs.tail: _*).show
// +---+------+------+------+
// |  k|min(v)|max(v)|avg(v)|
// +---+------+------+------+
// |  1|   3.0|   3.0|   3.0|
// |  2|  -5.0|  -5.0|  -5.0|
// +---+------+------+------+

or

df.groupBy(groupBy.head, groupBy.tail: _*).agg(exprs.head, exprs.tail: _*).show

Unfortunately parser which is used internally SQLContext is not exposed publicly but you can always try to build plain SQL queries:

df.registerTempTable("df")
val groupExprs = groupBy.mkString(",")
val aggExprs = aggregate.flatMap(c => operations.map(
  f => s"$f($c) AS ${c}_${f}")
).mkString(",")

sqlContext.sql(s"SELECT $groupExprs, $aggExprs FROM df GROUP BY $groupExprs")

Python:

from pyspark.sql.functions import mean, sum, max, col

df = sc.parallelize([(1, 3.0), (1, 3.0), (2, -5.0)]).toDF(["k", "v"])
groupBy = ["k"]
aggregate = ["v"] 
funs = [mean, sum, max]

exprs = [f(col(c)) for f in funs for c in aggregate]

# or equivalent df.groupby(groupBy).agg(*exprs)
df.groupby(*groupBy).agg(*exprs)

See also:

10465355
  • 4,481
  • 2
  • 20
  • 44
zero323
  • 322,348
  • 103
  • 959
  • 935
  • @zero323 Do you know by any chance how to do this with python API? – lanenok Mar 18 '16 at 13:22
  • @lanenok Pretty much the same way. Just replace flatMap with comprehensions. – zero323 Mar 18 '16 at 13:28
  • @zero323 Thanks a lot for the solution that works great indeed! The question now is: how do I aggregate some given functions on some given values. For example, groupBy K, get aggregates of "v", and get count of "w". – Ahmet Jul 20 '16 at 19:43
  • @Ahmet You can pass any list of aggregations you want. These don't have to be on a single column, for example: `exprs = [mean("v"), count("w")]`. See also http://stackoverflow.com/a/33907419/1560062 – zero323 Jul 20 '16 at 19:47
  • I needed an extra import in order for the Scala code above to work: `import spark.implicits._` – jamiet May 08 '18 at 12:35
  • and in the first line the function you need to import is `avg`, not `mean` – jamiet May 08 '18 at 22:20
  • Could anyone tell me what is supposed to do the * in *groupBy and *exprs? Many thanks! – emilio.molina Mar 25 '19 at 22:38
  • @zero323 --- This one really great – vikrant rana May 16 '19 at 11:57
  • A very fast and pythonic answer! Thank you. For those who aren't grouping but just taking the aggregate across the entire dataframe, the reduced expression: df.agg(*exprs) Works in the same way – MisterJT Oct 03 '19 at 15:01
9

For those that wonder, how @zero323 answer can be written without a list comprehension in python:

from pyspark.sql.functions import min, max, col
# init your spark dataframe

expr = [min(col("valueName")),max(col("valueName"))]
df.groupBy("keyName").agg(*expr)
Rick
  • 2,080
  • 14
  • 27
  • 1
    for what its worth, I've used this technique very successfully in Python – jamiet May 07 '18 at 20:18
  • But zero323's list comprehension scales, whereas your manually crafted list does not. – Oliver W. Apr 24 '19 at 21:33
  • 1
    @OliverW. well the point is that you don't have to calculate all combinations of functions and columns, but you can specify only the pairs that you need. – Rick Apr 27 '19 at 17:33
6

Do something like

from pyspark.sql import functions as F

df.groupBy('groupByColName') \
  .agg(F.sum('col1').alias('col1_sum'),
       F.max('col2').alias('col2_max'),
       F.avg('col2').alias('col2_avg')) \
  .show()
1

Here is another straight forward way to apply different aggregate functions on the same column while using Scala (this has been tested in Azure Databricks).

val groupByColName = "Store"
val colName = "Weekly_Sales"

df.groupBy(groupByColName)
  .agg(min(colName),
       max(colName),
       round(avg(colName), 2))
  .show()
0

for example if you want to count percentage of zeroes in each column in pyspark dataframe for which we can use expression to be executed on each column of the dataframe

from pyspark.sql.functions import count,col

    def count_zero_percentage(c):
        
        pred = col(c)==0
        return sum(pred.cast("integer")).alias(c)
    
    df.agg(*[count_zero_percentage(c)/count('*').alias(c) for c in df.columns]).show()
-2
case class soExample(firstName: String, lastName: String, Amount: Int)
val df =  Seq(soExample("me", "zack", 100)).toDF

import org.apache.spark.sql.functions._

val groupped = df.groupBy("firstName", "lastName").agg(
     sum("Amount"),
     mean("Amount"), 
     stddev("Amount"),
     count(lit(1)).alias("numOfRecords")
   ).toDF()

display(groupped)

// Courtesy Zach ..

Zach simplified answer for a post Marked Duplicate Spark Scala Data Frame to have multiple aggregation of single Group By

user2458922
  • 1,691
  • 1
  • 17
  • 37