0

In Apache Spark 2.0+ how do I find the maximum of minimums, in the following problem:

df1
+---+---+
| id| ts|
+---+---+
|  1| 20|
|  2| 15|
+---+---+

df2

+---+---+
| id| ts|
+---+---+
|  1| 10|
|  1| 25|
|  1| 36|
|  2| 25|
|  2| 35|
+---+---+

the desired dataframe is:

+---+---+
| id| ts|
+---+---+
|  1| 10|
|  2| 15|
+---+---+

Problem in words: For every id in df1 pick the maximum ts value that is lesser than the ts value in df1, if no such value exists, just print the ts value in df1.

Vigneshwaren
  • 1,273
  • 2
  • 15
  • 24
  • 1
    Could supplement this code request with your current attempts, please? Also, please read [How to make good reproducible Apache Spark Dataframe examples](https://stackoverflow.com/q/48427185/8371915). – Alper t. Turker Jan 29 '18 at 17:07

1 Answers1

1

Just aggregate join and select with when otherwise:

from pyspark.sql.functions import col, when, max as max_

df1 = spark.createDataFrame(
    [(1, 20),(2, 15)], ("id", "ts")
 )
df2 = spark.createDataFrame(
    [(1, 10), (1, 25), (1, 36), (2, 25), (2, 35)], ("id", "ts")
)

ts = when(
    col("df2.ts") < col("df1.ts"), col("df2.ts")
).otherwise(col("df1.ts")).alias("ts")

(df2
    .groupBy("id")
    .agg(max_("ts").alias("ts")).alias("df2")
    .join(df1.alias("df1"), ["id"])
    .select("id", ts)
    .show())

# +---+---+                                                                       
# | id| ts|
# +---+---+
# |  1| 20|
# |  2| 15|
# +---+---+

If not all ids have equivalents in df2 use right outer join:

.join(df1.alias("df1"), ["id"], "right")

and adjust ts to

ts = coalesce(when(
    col("df2.ts") < col("df1.ts"), col("df2.ts")
).otherwise(col("df1.ts")), col("df1.ts")).alias("ts")
Alper t. Turker
  • 34,230
  • 9
  • 83
  • 115