6

I followed a post on StackOverflow about returning the maximum of a column grouped by another column, and got an unexpected Java exception.

Here is the test data:

import pyspark.sql.functions as f
data = [('a', 5), ('a', 8), ('a', 7), ('b', 1), ('b', 3)]
df = spark.createDataFrame(data, ["A", "B"])
df.show()

+---+---+
|  A|  B|
+---+---+
|  a|  5|
|  a|  8|
|  a|  7|
|  b|  1|
|  b|  3|
+---+---+

Here is the solution that allegedly works for other users:

from pyspark.sql import Window
w = Window.partitionBy('A')
df.withColumn('maxB', f.max('B').over(w))\
    .where(f.col('B') == f.col('maxB'))\
    .drop('maxB').show()

which should produce this output:

#+---+---+
#|  A|  B|
#+---+---+
#|  a|  8|
#|  b|  3|
#+---+---+

Instead, I get:

java.lang.UnsupportedOperationException: Cannot evaluate expression: max(input[2, bigint, false]) windowspecdefinition(input[0, string, true], specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$()))

I have only tried this on Spark 2.4 on Databricks. I tried the equivalent SQL syntax and got the same error.

pault
  • 41,343
  • 15
  • 107
  • 149
AltShift
  • 336
  • 3
  • 18

1 Answers1

2

Databricks Support was able to reproduce the issue on Spark 2.4 but not on earlier versions. Apparently, it arises from a difference in the way the physical plan is formulated (I can post their response if requested). A fix is planned.

Meanwhile, here is one alternative solution to the original problem that does not fall prey to the version 2.4 issue:

df.withColumn("maxB", f.max('B').over(w)).drop('B').distinct().show()

+---+----+
|  A|maxB|
+---+----+
|  b|   3|
|  a|   8|
+---+----+
AltShift
  • 336
  • 3
  • 18