0

I would like to compare 2 dataframes in pyspark.

Below is my test case dataset (from google).

So I have 2 df's

  1. Base DF
  2. Secondary DF

baseDF

No,Name,Sal,Address,Dept,Join_Date
11,Sam,1000,ind,IT,2/11/2019
22,Tom,2000,usa,HR,2/11/2019
33,Kom,3500,uk,IT,2/11/2019
44,Nom,4000,can,HR,2/11/2019
55,Vom,5000,mex,IT,2/11/2019
66,XYZ,5000,mex,IT,2/11/2019

secDF

No,Name,Sal,Address,Dept,Join_Date
11,Sam,1000,ind,ITA,2/11/2019
22,Tom,2500,usa,HRA,2/11/2019
33,Kom,3000,uk,ITA,2/11/2019
44,Nom,4600,can,HRA,2/11/2019
55,Vom,8000,mex,ITA,2/11/2019
77,XYZ,5000,mex,ITA,2/11/2019

I have to compare secDF and baseDF with 2 keys (No and Name), if those fields match (I only need the matched records from secDF)then I have to update the salary and Dept field of baseDF with the value from secDF

Expected output

No,Name,Sal,Address,Dept,Join_Date
11,Sam,1000,ind,ITA,2/11/2019
22,Tom,2500,usa,HRA,2/11/2019
33,Kom,3000,uk,ITA,2/11/2019
44,Nom,4600,can,HRA,2/11/2019
55,Vom,8000,mex,ITA,2/11/2019
66,XYZ,5000,mex,IT,2/11/2019

Using pyspark I can use subtract() to find the values of table1 not present in table2, and consequently use unionAll of the two tables or should I use withcolumn to overwrite values satisfying the condition.

Could someone suggest a good way of doing this?

Update --- I have to compare secDF and baseDF with 2 keys (No and Name), if those fields match (I only need the matched records from secDF)then I have to update the salary and Dept field of baseDF with the value from secDF.

mck
  • 40,932
  • 13
  • 35
  • 50
USB
  • 6,019
  • 15
  • 62
  • 93

1 Answers1

1

You can do a left join and coalesce the resulting Sal column, with secdf taking precedence over basedf:

import pyspark.sql.functions as F

result = basedf.alias('basedf').join(
    secdf.alias('secdf'),
    ['No', 'Name'],
    'left'
).select(
    [F.coalesce('secdf.Sal', 'basedf.Sal').alias('Sal')
     if c == 'Sal'
     else F.coalesce('secdf.Dept', 'basedf.Dept').alias('Dept')
     if c == 'Dept'
     else f'basedf.{c}'
     for c in basedf.columns]
)

result.show()
+---+----+----+-------+----+---------+
| No|Name| Sal|Address|Dept|Join_Date|
+---+----+----+-------+----+---------+
| 11| Sam|1000|    ind| ITA|2/11/2019|
| 22| Tom|2500|    usa| HRA|2/11/2019|
| 33| Kom|3000|     uk| ITA|2/11/2019|
| 44| Nom|4600|    can| HRA|2/11/2019|
| 55| Vom|8000|    mex| ITA|2/11/2019|
| 66| XYZ|5000|    mex|  IT|2/11/2019|
+---+----+----+-------+----+---------+
mck
  • 40,932
  • 13
  • 35
  • 50
  • Yeah. So this is just a test data. I will have to deal with large amount of data. baseDF could be in million s and secDF will be in lakh..will join results into performance issue? Or as secDF is small compared with the other,we can do a broadcast join also right – USB Jan 23 '21 at 15:22
  • 1
    You have to do a join because you need to find matching pairs. It's an equi join so performance shouldn't be too bad. You can surely use a broadcast join if it's small – mck Jan 23 '21 at 15:27
  • Yes.join is must. I am new to pyspark and I haven't written a similar select expression. So here once joined,we are iterating through all the columns in our baseDF,ie all the values from baseDF will be populated except sal value and once sal coloum has reached then f coalesce func will be applied and whatever No and name has matched in the join, the value of baseDF.sal will be get updated with secDF.sal column value. – USB Jan 23 '21 at 15:45
  • 1
    @UnmeshaSreeVeni perfect description – mck Jan 23 '21 at 16:29
  • I will also test it from my end and accept the answer. – USB Jan 23 '21 at 17:17