I implemented a groupby function which groups columns based on a particular aggregation successfully. The issue is I am using a argument for chosen columns and aggregation as Map[String,String] which means multiple aggregations cannot be performed on one column. for example sum, mean and max all on one column. below is what works soo far:
groupByFunction(input, Map("someSignal" -> "mean"))
def groupByFunction(dataframeDummy: DataFrame,
columnsWithOperation: Map[String,String],
someSession: String = "sessionId",
someSignal: String = "signalName"): DataFrame = {
dataframeDummy
.groupBy(
col(someSession),
col(someSignal)
).agg(columnsWithOperation)
}
Upon looking into it a bit more, the agg function can take a list of columns like below
userData
.groupBy(
window(
(col(timeStampColumnName) / lit(millisSecondsPerSecond)).cast(TimestampType),
timeWindowInS.toString.concat(" seconds")
),
col(sessionColumnName),
col(signalColumnName)
).agg(
mean("physicalSignalValue"),
sum("physicalSignalValue")).show()
So I decided to try to manipulate the input to look like that, below is how I did it:
val signalIdColumn = columnsWithOperation.toSeq.flatMap { case (key, list) => list.map(key -> _) }
val result = signalIdColumn.map(tuple =>
if (tuple._2 == "mean")
mean(tuple._1)
else if (tuple._2 == "sum")
sum(tuple._1)
else if (tuple._2 == "max")
max(tuple._1))
Now I have a list of columns, which is still a problem for agg funciton.