There are multiple options to achieve this. I am a providing example for one and can provide a hint for rest-
from pyspark.sql import functions as F
from pyspark.sql.window import Window as W
from pyspark.sql import types as T
data = [(("ID1", 3, 5)), (("ID2", 4, 12)), (("ID3", 8, 3))]
df = spark.createDataFrame(data, ["ID", "colA", "colB"])
df.show()
+---+----+----+
| ID|colA|colB|
+---+----+----+
|ID1| 3| 5|
|ID2| 4| 12|
|ID3| 8| 3|
+---+----+----+
#Below F.array creates an array of column name and value pair like [['colA', 3], ['colB', 5]] then F.explode break this array into rows like different column and value pair should be in different rows
df = df.withColumn(
"max_val",
F.explode(
F.array([
F.array([F.lit(cl), F.col(cl)]) for cl in df.columns[1:]
])
)
)
df.show()
+---+----+----+----------+
| ID|colA|colB| max_val|
+---+----+----+----------+
|ID1| 3| 5| [colA, 3]|
|ID1| 3| 5| [colB, 5]|
|ID2| 4| 12| [colA, 4]|
|ID2| 4| 12|[colB, 12]|
|ID3| 8| 3| [colA, 8]|
|ID3| 8| 3| [colB, 3]|
+---+----+----+----------+
#Then select columns so that column name and value should be in different columns
df = df.select(
"ID",
"colA",
"colB",
F.col("max_val").getItem(0).alias("col_name"),
F.col("max_val").getItem(1).cast(T.IntegerType()).alias("col_value"),
)
df.show()
+---+----+----+--------+---------+
| ID|colA|colB|col_name|col_value|
+---+----+----+--------+---------+
|ID1| 3| 5| colA| 3|
|ID1| 3| 5| colB| 5|
|ID2| 4| 12| colA| 4|
|ID2| 4| 12| colB| 12|
|ID3| 8| 3| colA| 8|
|ID3| 8| 3| colB| 3|
+---+----+----+--------+---------+
# Rank column values based on ID in desc order
df = df.withColumn(
"rank",
F.rank().over(W.partitionBy("ID").orderBy(F.col("col_value").desc()))
)
df.show()
+---+----+----+--------+---------+----+
| ID|colA|colB|col_name|col_value|rank|
+---+----+----+--------+---------+----+
|ID2| 4| 12| colB| 12| 1|
|ID2| 4| 12| colA| 4| 2|
|ID3| 8| 3| colA| 8| 1|
|ID3| 8| 3| colB| 3| 2|
|ID1| 3| 5| colB| 5| 1|
|ID1| 3| 5| colA| 3| 2|
+---+----+----+--------+---------+----+
#Finally Filter rank = 1 as max value have rank 1 because we ranked desc value
df.where("rank=1").show()
+---+----+----+--------+---------+----+
| ID|colA|colB|col_name|col_value|rank|
+---+----+----+--------+---------+----+
|ID2| 4| 12| colB| 12| 1|
|ID3| 8| 3| colA| 8| 1|
|ID1| 3| 5| colB| 5| 1|
+---+----+----+--------+---------+----+
Other Options are -
- Use UDF on your base df and return column name having a max value
- In the same example after making the column name and value column instead of rank use group by
ID
take max col_value
. Then join with the previous df.