0

I have list of aggregation function, alias and other as JSON configuration, like

{
    "aggregation": [{
        "alias_column_name1": {
            "sum": "<columnName1>"
        }
    }, {
        "alias_column_name2": {
            "sum": "<columnName1>"
        }
    }]
}

Currently I have executing same by following code:

val col1:Column = sum(<dataframeName>(<columnName1>)).alias(<alias_column_name1>)
val col2:Column = sum(<dataframeName>(<columnName2>)).alias(<alias_column_name2>)
dataframe.groupby(..).agg(col1, col2)

But I have many aggregation configuration and I want to pass List of such in aggregation method, like

val colList = List[Column](col1, col2)
dataframe.groupby(..).agg(colList)

How can I achieve same? Thanks

Versions:

Scala : 2.11
Spark : 2.2.2
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.2.2"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.2.2"
libraryDependencies += "org.apache.spark" %% "spark-hive" % "2.2.2"
user811602
  • 1,314
  • 2
  • 17
  • 47

1 Answers1

3

Separate list of columns and functions

Let's say you have a list of functions:

import org.apache.spark.sql.Column
import org.apache.spark.sql.functions._

val funs: Seq[Column => Column] = Seq(sum _, min _, max _)

and a list of columns

val cols: Seq[Column] = Seq($"y", $"z")

and a dataset

val df = Seq((1, 2, 3), (1, 4, 5) ).toDF("x", "y", "z")

you can combine both

val exprs = for { c <- cols; f <- funs} yield f(c)

and then

df.groupBy($"x").agg(exprs.head, exprs.tail: _*)

The same thing could be done in PySpark:

from pyspark.sql import functions as F

funs = [F.sum, F.min, F.max]
cols = ["y", "z"]

df = spark.createDataFrame([(1, 2, 3), (1, 4, 5)], ("x", "y", "z"))

df.groupBy("x").agg(*[f(c) for c in cols for f in funs])

Predefined list of operations for each column

If you want start with predefined set of aliases, columns and functions, as the one shown in your question, it might be easier to just restructure it to

trait AggregationOp {
  def expr: Column
}

case class FuncAggregationOp(c: Column, func: Column => Column, alias: String
    ) extends AggregationOp {
  def expr = func(c).alias(alias)
}

val ops: Seq[AggregationOp] = Seq(
   FuncAggregationOp($"y", sum _, "alias_column_name1"),
   FuncAggregationOp($"z", sum _, "alias_column_name2")
)
val exprs = ops.map(_.expr)

df.groupBy($"x").agg(exprs.head, exprs.tail: _*)

You can easily adjust this to handle other cases:

case class StringAggregationOp(c: String, func: String, alias: String
    ) extends AggregationOp {
  def expr = org.apache.spark.sql.functions.expr(s"${func}(`${c}`)").alias(alias)
}

val ops: Seq[AggregationOp] = Seq(
   StringAggregationOp("y", "sum", "alias_column_name1"),
   StringAggregationOp("z", "sum", "alias_column_name2")
)

Python equivalent could be something like this:

from collections import namedtuple
from pyspark.sql import functions as F

class AggregationOp(namedtuple("Op", ["c", "func", "alias"])):
    def expr(self):
        if callable(self.func):
            return self.func(self.c).alias(self.alias)
        else:
            return F.expr("{func}(`{c}`)".format
                (func = self.func, c = self.c)).alias(self.alias)

ops = [
    AggregationOp("y", "sum", "alias_column_name1"),
    AggregationOp("z", "sum", "alias_column_name2")
]

 df.groupBy("x").agg(*[op.expr() for op in ops])

Related question:

10465355
  • 4,481
  • 2
  • 20
  • 44