1

I have a dataframe with a lot of columns, but for this example, we can use this one:

`val dfIn = sqlContext.createDataFrame(Seq(("r0", 0, 2, 3, "a"),("r1", 1, 0, 0, "a"),("r2", 0, 2, 2, "a"))).toDF("prev_column", "c0", "c1", "c2", "post_column")`

Obtaining a dataframe like this:

prev_column c0 c1 c2 post_column
r0 0 1 2 a0
r1 1 2 0 a1
r2 2 0 1 a2

I need the names of the two columns with the highest value for each record.

I want to achieve a dataframe like this and I cannot do it:

prev_column c0 c1 c2 post_column first second
r0 0 1 2 a0 c2 c1
r1 1 2 0 a1 c1 c0
r2 2 0 1 a2 c0 c2

I saw some answers for pyspark and with numPy but none with scala.

isaga
  • 11
  • 4

2 Answers2

0

There are probably multiple ways to achieve this with UDFs. Without UDFs, you can unpivot the dataframe, find the highest value column via a Window operation and then re-group everything together. The code below works for any number of columns that should be ranked; you just need to change colsForRanking accordingly.

First, unpivot (stack) the data:

val colsForRanking = List("c0", "c1", "c2")
val remainingCols = dfIn.columns.filterNot(colsForRanking.contains(_))
val stackString = (colsForRanking.size.toString +: colsForRanking
  .flatMap(c => List(s"'$c'", c))).mkString(",") // 3,'c0',c0,'c1',c1,'c2',c2

val df_unpivot = dfIn.select(
  dfIn.columns.map(col) :+ expr(s"stack($stackString) as (colName, value)"): _*)

This adds value and colName columns by duplicating the original data for all cNs and putting each cN in an individual row. Next, you need to rank the rows using a Window:

val w = Window.partitionBy(remainingCols.map(col): _*)
  .orderBy($"value".desc) // we don't need _all_ columns for the partition, but have to make sure the combination is unique

val df_ranked = df_unpivot
  .withColumn("valueRank", row_number.over(w))
  .withColumn("first", when($"valueRank" === 1, $"colName"))
  .withColumn("second", when($"valueRank" === 2, $"colName"))

when without otherwise is NULL when the condition isn't met, which will be useful in the last step.

Finally, you can group everything back together:

val df_result = df_ranked.groupBy(dfIn.columns.map(col): _*)
  .agg(
    first($"first", ignoreNulls = true).alias("first"),
    first($"second", ignoreNulls = true).alias("second"),
  )

first() gives you an arbitrary value within each group (as there is no sorting); however, since you made sure that the values you don't want are NULL, it returns exactly the value you want.

0

You can do it using .when() and .otherwise() statements, which are similiar to if else statements in other programming languages:

val dfIn = spark.createDataFrame(Seq(
  ("r0", 0, 1, 2, "a0"),
  ("r1", 1, 2, 0, "a1"),
  ("r2", 2, 0, 1, "a2")
)).toDF("prev_column", "c0", "c1", "c2", "post_column")

val resultDf = dfIn.withColumn("first",
  when(col("c0").geq(col("c1")).and(col("c0").geq(col("c2"))), "c0")
    .when(col("c1").geq(col("c0")).and(col("c1").geq(col("c2"))), "c1")
    .otherwise("c2")
).withColumn("second",
  when(col("c0").between(col("c1"), col("c2"))
    .or(col("c0").between(col("c2"), col("c1"))), "c0")
    .when(col("c1").between(col("c0"), col("c2"))
      .or(col("c1").between(col("c2"), col("c1"))), "c1")
    .otherwise("c2")
)
resultDf.show(false)

+-----------+---+---+---+-----------+-----+------+
|prev_column|c0 |c1 |c2 |post_column|first|second|
+-----------+---+---+---+-----------+-----+------+
|r0         |0  |1  |2  |a0         |c2   |c1    |
|r1         |1  |2  |0  |a1         |c1   |c0    |
|r2         |2  |0  |1  |a2         |c0   |c2    |
+-----------+---+---+---+-----------+-----+------+
Abdennacer Lachiheb
  • 4,388
  • 7
  • 30
  • 61