32

I am working on a PySpark DataFrame with n columns. I have a set of m columns (m < n) and my task is choose the column with max values in it.

For example:

Input: PySpark DataFrame containing :

col_1 = [1,2,3], col_2 = [2,1,4], col_3 = [3,2,5]

Ouput :

col_4 = max(col1, col_2, col_3) = [3,2,5]

There is something similar in pandas as explained in this question.

Is there any way of doing this in PySpark or should I change convert my PySpark df to Pandas df and then perform the operations?

Community
  • 1
  • 1
Hemant
  • 619
  • 2
  • 6
  • 17
  • 1
    if the question is about getting the max value of each column, then it looks like the expected output should be [max(col_1), max(col_2), max(col_3)] = [3, 4, 5] – Quetzalcoatl Sep 22 '18 at 21:17

5 Answers5

33

You can reduce using SQL expressions over a list of columns:

from pyspark.sql.functions import max as max_, col, when
from functools import reduce

def row_max(*cols):
    return reduce(
        lambda x, y: when(x > y, x).otherwise(y),
        [col(c) if isinstance(c, str) else c for c in cols]
    )

df = (sc.parallelize([(1, 2, 3), (2, 1, 2), (3, 4, 5)])
    .toDF(["a", "b", "c"]))

df.select(row_max("a", "b", "c").alias("max")))

Spark 1.5+ also provides least, greatest

from pyspark.sql.functions import greatest

df.select(greatest("a", "b", "c"))

If you want to keep name of the max you can use `structs:

from pyspark.sql.functions import struct, lit

def row_max_with_name(*cols):
    cols_ = [struct(col(c).alias("value"), lit(c).alias("col")) for c in cols]
    return greatest(*cols_).alias("greatest({0})".format(",".join(cols)))

 maxs = df.select(row_max_with_name("a", "b", "c").alias("maxs"))

And finally you can use above to find select "top" column:

from pyspark.sql.functions import max

((_, c), ) = (maxs
    .groupBy(col("maxs")["col"].alias("col"))
    .count()
    .agg(max(struct(col("count"), col("col"))))
    .first())

df.select(c)
zero323
  • 322,348
  • 103
  • 959
  • 935
  • 1
    this is very helpful! how do you find second largest instead? I want to get the name of the second largest column – user1569341 Aug 07 '19 at 16:28
29

We can use greatest

Creating DataFrame

df = spark.createDataFrame(
    [[1,2,3], [2,1,2], [3,4,5]], 
    ['col_1','col_2','col_3']
)
df.show()
+-----+-----+-----+
|col_1|col_2|col_3|
+-----+-----+-----+
|    1|    2|    3|
|    2|    1|    2|
|    3|    4|    5|
+-----+-----+-----+

Solution

from pyspark.sql.functions import greatest
df2 = df.withColumn('max_by_rows', greatest('col_1', 'col_2', 'col_3'))

#Only if you need col
#from pyspark.sql.functions import col
#df2 = df.withColumn('max', greatest(col('col_1'), col('col_2'), col('col_3')))
df2.show()

+-----+-----+-----+-----------+
|col_1|col_2|col_3|max_by_rows|
+-----+-----+-----+-----------+
|    1|    2|    3|          3|
|    2|    1|    2|          2|
|    3|    4|    5|          5|
+-----+-----+-----+-----------+
ansev
  • 30,322
  • 5
  • 17
  • 31
12

You can also use the pyspark built-in least:

from pyspark.sql.functions import least, col
df = df.withColumn('min', least(col('c1'), col('c2'), col('c3')))
mattexx
  • 6,456
  • 3
  • 36
  • 47
0

Scala solution:

df = sc.parallelize(Seq((10, 10, 1 ), (200, 2, 20), (3, 30, 300), (400, 40, 4))).toDF("c1", "c2", "c3"))  

df.rdd.map(row=>List[String](row(0).toString,row(1).toString,row(2).toString)).map(x=>(x(0),x(1),x(2),x.min)).toDF("c1","c2","c3","min").show    

+---+---+---+---+  
| c1| c2| c3|min|  
+---+---+---+---+  
| 10| 10|  1|  1|    
|200|  2| 20|  2|  
|  3| 30|300|  3|  
|400| 40|  4|  4|  
+---+---+---+---+  
Will Vousden
  • 32,488
  • 9
  • 84
  • 95
0

Another simple way of doing it. Let us say that the below df is your dataframe

df = sc.parallelize([(10, 10, 1 ), (200, 2, 20), (3, 30, 300), (400, 40, 4)]).toDF(["c1", "c2", "c3"])
df.show()

+---+---+---+
| c1| c2| c3|
+---+---+---+
| 10| 10|  1|
|200|  2| 20|
|  3| 30|300|
|400| 40|  4|
+---+---+---+

You can process the above df as below to get the desited results

from pyspark.sql.functions import lit, min

df.select( lit('c1').alias('cn1'), min(df.c1).alias('c1'),
           lit('c2').alias('cn2'), min(df.c2).alias('c2'),
           lit('c3').alias('cn3'), min(df.c3).alias('c3')
          )\
         .rdd.flatMap(lambda r: [ (r.cn1, r.c1), (r.cn2, r.c2), (r.cn3, r.c3)])\
         .toDF(['Columnn', 'Min']).show()

+-------+---+
|Columnn|Min|
+-------+---+
|     c1|  3|
|     c2|  2|
|     c3|  1|
+-------+---+
Rags
  • 1,891
  • 18
  • 19