3

Any idea how to write this in PySpark?

I have two PySpark DataFrames that i'm trying to union. However, there is 1 value that I want to update based on 2 duplicate column values.

PyDf1:

+-----------+-----------+-----------+------------+
|test_date  |student_id |take_home  |grade       |
+-----------+-----------+-----------+------------+
| 2022-09-26|        655|          N|           A|
| 2022-09-26|        656|          N|           B|
| 2022-09-26|        657|          N|           C|
| 2022-09-26|        658|          N|           D|
+-----------+-----------+-----------+------------+

PyDf2:

+-----------+-----------+-----------+------------+
|test_date  |student_id |take_home  |grade       |
+-----------+-----------+-----------+------------+
| 2022-09-27|        655|          N|           D|
| 2022-09-27|        656|          N|           C|
| 2022-09-27|        657|          N|           B|
| 2022-09-27|        658|          N|           A|
| 2022-09-26|        655|          N|           B|  <- Duplicate test_date & student_id, different grade
+-----------+-----------+-----------+------------+

desired output:

+-----------+-----------+-----------+------------+
|test_date  |student_id |take_home  |grade       |
+-----------+-----------+-----------+------------+
| 2022-09-26|        655|          N|           B|  <- Updated to B for grade
| 2022-09-26|        656|          N|           B|
| 2022-09-26|        657|          N|           C|
| 2022-09-26|        658|          N|           D|
| 2022-09-27|        655|          N|           D|
| 2022-09-27|        656|          N|           C|
| 2022-09-27|        657|          N|           B|
| 2022-09-27|        658|          N|           A|
+-----------+-----------+-----------+------------+
Mick
  • 265
  • 2
  • 10

2 Answers2

1

Use window functions. Logic and code below

  df = (PyDf1.unionByName(PyDf2)#Union the dfs
          .withColumn('CurrentGrade', lead('grade').over(Window.partitionBy('student_id','test_date').orderBy('student_id',desc(to_date('test_date')))))#Create column comparing consecutive grades
          .where(col('CurrentGrade').isNull())#retain last grade by dropping null
          .drop('CurrentGrade')#Drop the temp column
     )
wwnde
  • 26,119
  • 6
  • 18
  • 32
0

Figured out the solution:

  1. Union the two table
  2. Add index column
  3. Assign row_number number using parititionBy (Windows Function)
  4. Filter rows and column
# Union the two table
df_new = PyDf1.union(PyDf2)

# Add Index column
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, col, concat
w = Window.orderBy('test_date')
df_new = df_new.withColumn('index', row_number().over(w))

# Assign row_number number using parititionBy (Windows Function)
from pyspark.sql.window import Window
windowSpec  = Window.partitionBy(col("test_date"),
                                 col("student_id"),
                                 col('take_home')) \
                    .orderBy(col("index").desc())
df_new = df_new.withColumn("row_number",row_number().over(windowSpec))

# Filter rows and columns
df_new = df_new.filter((df_new.row_number == 1) & (df_new.delete_flag == 'N'))
columns = ['test_date', 'student_id', 'test_date', 'grade']
df_new = df_new.select(columns)
Mick
  • 265
  • 2
  • 10