3

I have a dataframe storing levels per quarter, df1:

| id  | year | quarter | level  |
|-----|------|---------|--------|
| 111 | 2021 | 1       | Silver |
| 111 | 2021 | 2       | Gold   |
| 222 | 2021 | 1       | Bronze |
| 222 | 2021 | 2       | Silver |

I also have another dataframe, storing the same data but not grouped by quarter, df2:

| id  | level  |
|-----|--------|
| 111 | Bronze |
| 222 | Gold   |

I want to calculate the max level across both dataframes but cannot use Max due to (g)old < (s)ilver. Is there a way to do a custom max which captures the rule of gold > silver > bronze?

My expected output would look like this.

| id  | year | quarter | level  |
|-----|------|---------|--------|
| 111 | 2021 | 1       | Silver |
| 111 | 2021 | 2       | Gold   |
| 222 | 2021 | 1       | Gold   |
| 222 | 2021 | 2       | Gold   |

I tried this before running into the issue:

output = (
    df1.join(df2, on = ['id'])
    .groupby('id', 'year', 'quarter')
    .agg(
        F.max(F.col('level')).alias('level') #would rank Silver greater than Gold
    )
)
mck
  • 40,932
  • 13
  • 35
  • 50
stackq
  • 491
  • 2
  • 15
  • I would assume that if the order of the columns is already sorted on the level this shouldn't be a problem. Maybe [this](https://stackoverflow.com/questions/13838405/custom-sorting-in-pandas-dataframe) post works for the sorting? – Thymen Mar 25 '21 at 16:11
  • I would rather a PySpark solution rather than Pandas if possible :) – stackq Mar 25 '21 at 16:12
  • Sorry, I didn't even noticed that it was a pyspark problem until I now checked the tags. All of the terms and code felt like pandas to me. But [this](https://stackoverflow.com/questions/60536565/custom-sorting-in-pyspark-dataframes) post shows custom string sorting for pyspark. – Thymen Mar 25 '21 at 16:17

2 Answers2

3

You can create a mapping array column to facilitate sorting by the array index, and use greatest to get the max level that you wanted to calculate.

import pyspark.sql.functions as F

df = df1.alias('df1').join(df2.alias('df2'), 'id').select(
    'id', 'year', 'quarter', 
    F.expr("""
        array('Bronze', 'Silver', 'Gold')[
            greatest(
                map('Bronze', 0, 'Silver', 1, 'Gold', 2)[df1.level],
                map('Bronze', 0, 'Silver', 1, 'Gold', 2)[df2.level]
            )
        ] as level
    """)
)

df.show()
+---+----+-------+------+
| id|year|quarter| level|
+---+----+-------+------+
|111|2021|      1|Silver|
|111|2021|      2|  Gold|
|222|2021|      1|  Gold|
|222|2021|      2|  Gold|
+---+----+-------+------+

For newer Spark versions, you can use array_position:

df = df1.alias('df1').join(df2.alias('df2'), 'id').withColumn(
    'mapping', 
    F.expr("array('Bronze', 'Silver', 'Gold')")
).select(
    'id', 'year', 'quarter', 
    F.col('mapping')[
        F.expr("greatest(array_position(mapping, df1.level), array_position(mapping, df2.level)) - 1")
    ].alias('level')
)
mck
  • 40,932
  • 13
  • 35
  • 50
2

You can also define your custom ordering using when expression and use greatest function on structs to get the max :

import pyspark.sql.functions as F

order = (F.when(F.col("level") == "Gold", 3)
         .when(F.col("level") == "Silver", 2)
         .when(F.col("level") == "Bronze", 1))

df1 = df1.withColumn("level", F.struct(order, F.col("level")))
df2 = df2.withColumn("level", F.struct(order, F.col("level")))

result = df1.alias("df1").join(df2.alias("df2"), ["id"]).select(
    "id", "year", "quarter",
    F.greatest(F.col("df1.level"), F.col("df2.level")).getField("level").alias("level")
)

result.show()
# +---+----+-------+------+
# | id|year|quarter| level|
# +---+----+-------+------+
# |222|2021|      1|  Gold|
# |222|2021|      2|  Gold|
# |111|2021|      1|Silver|
# |111|2021|      2|  Gold|
# +---+----+-------+------+

Or by using a map literal that defines the ordering, and the same way using max on structs:

order = F.create_map(*[F.lit(l) for l in ['Gold', 3, 'Silver', 2, 'Bronze', 1]])

df1 = df1.withColumn("level", F.struct(order.getItem(F.col("level")), F.col("level")))
df2 = df2.withColumn("level", F.struct(order.getItem(F.col("level")), F.col("level")))

result = df1.alias("df1").join(df2.alias("df2"), ["id"]).select(
    "id", "year", "quarter",
    F.greatest(F.col("df1.level"), F.col("df2.level")).getField("level").alias("level")
)
blackbishop
  • 30,945
  • 11
  • 55
  • 76