There are probably multiple ways to achieve this with UDFs. Without UDFs, you can unpivot the dataframe, find the highest value column via a Window
operation and then re-group everything together. The code below works for any number of columns that should be ranked; you just need to change colsForRanking
accordingly.
First, unpivot (stack
) the data:
val colsForRanking = List("c0", "c1", "c2")
val remainingCols = dfIn.columns.filterNot(colsForRanking.contains(_))
val stackString = (colsForRanking.size.toString +: colsForRanking
.flatMap(c => List(s"'$c'", c))).mkString(",") // 3,'c0',c0,'c1',c1,'c2',c2
val df_unpivot = dfIn.select(
dfIn.columns.map(col) :+ expr(s"stack($stackString) as (colName, value)"): _*)
This adds value
and colName
columns by duplicating the original data for all cN
s and putting each cN
in an individual row. Next, you need to rank the rows using a Window
:
val w = Window.partitionBy(remainingCols.map(col): _*)
.orderBy($"value".desc) // we don't need _all_ columns for the partition, but have to make sure the combination is unique
val df_ranked = df_unpivot
.withColumn("valueRank", row_number.over(w))
.withColumn("first", when($"valueRank" === 1, $"colName"))
.withColumn("second", when($"valueRank" === 2, $"colName"))
when
without otherwise
is NULL
when the condition isn't met, which will be useful in the last step.
Finally, you can group everything back together:
val df_result = df_ranked.groupBy(dfIn.columns.map(col): _*)
.agg(
first($"first", ignoreNulls = true).alias("first"),
first($"second", ignoreNulls = true).alias("second"),
)
first()
gives you an arbitrary value within each group (as there is no sorting); however, since you made sure that the values you don't want are NULL
, it returns exactly the value you want.