Create a column with the max in each row
List columns in which the max value can be found
Eliminate the NaNs in the list
Code below
import pyspark.sql.functions as F
from pyspark.sql import Window
from pyspark.sql.functions import*
w=Window.partitionBy('ID').orderBy().rowsBetween(Window.unboundedPreceding,0)
df=(df.withColumn(
"max",
F.greatest(*[F.col(x) for x in df.columns[1:]])#Find the max in each row
)
.withColumn(
'maxcol', array(*[when(col(c) ==col('max'), lit(c)) for c in df.columns])#Find intersection of max with all other columns
).withColumn(
'maxcol', expr("filter(maxcol, x -> x is not null)")#Filter ou the nans in the intersection
).show())
+---+----+----+----+----+---+------+
| ID|Col1|Col2|Col3|ColN|max|maxcol|
+---+----+----+----+----+---+------+
| 1| 10| 5| 21| -9| 21|[Col3]|
| 2| 87| 1| 1| 1| 87|[Col1]|
| 3| 1| 95| 1| 1| 95|[Col2]|
+---+----+----+----+----+---+------+
You could also use pandas_udf though I am not sure of the efficacy
from pyspark.sql.functions import pandas_udf
import pandas as pd
from pyspark.sql.types import *
def max_col(a:pd.DataFrame) -> pd.DataFrame:
s=a.isin(a.iloc[:,1:].max(1))
return a.assign(maxcol=s.agg(lambda x: x.index[x].values, axis=1))
schema=StructType([\
StructField('ID',LongType(),True),\
StructField('Col1',LongType(),True),\
StructField('Col2',LongType(),True),\
StructField('Col3',LongType(),True),\
StructField('ColN',LongType(),True),\
StructField('maxcol',ArrayType(StringType(),True),False)\
])
df.groupby('ID').applyInPandas(max_col, schema).show()