1

I have two dataframes lets say dfA and dfB.

dfA:
IdCol | Col2  | Col3
id1   | val2  | val3

dfB:
IdCol | Col2  | Col3
id1   | val2  | val4

The two data frames join in IdCol. I want to compare them per row and keep columns different and the their values in another dataframe. For example from the two above dataframes I want a the result:

dfChanges:
RowId | Col  | dfA_value | dfB_value |
id1   | Col3 | val_3     | val_4     |

I am kinda stuck on how to do this. Can anyone provide a direction? Thanks in advance

EDIT

My try is this. But its not very clear or has good performance. Is there a better way to do it?

dfChanges = None

#for all column excpet id
for colName in dfA.column[1:]:

    #Select whole columns of id and targeted column 
    #from both datasets and subtract to find differences
    changedRows = dfA.select(['IdCol',colName]).subtract(dfB.select(['IdCol',colName]))

    #Join with dfB to take the value of targeted column from there
    temp = changedRows.join(dfB.select(col('IdCol'),col(colName).alias("dfB_value")),dfA.IdCol == dfB.IdCol, 'inner'). \
    drop(dfB.IdCol)

    #Proper Rename columns
    temp = temp.withColumnRenamed(colname,"dfA_value")
    temp = temp.withColumn("Col",lit(colName))

    #Append to a single dataframe
    if (dfChanges is None):
        dfChanges = temp
    else:
        dfChanges = dfChanges.union(temp)
Michail N
  • 3,647
  • 2
  • 32
  • 51

1 Answers1

2

Join both data frames by id:

dfA = spark.createDataFrame(
    [("id1", "val2", "val3")], ("Idcol1", "Col2", "Col3")
)

dfB = spark.createDataFrame(
    [("id1", "val2", "val4")], ("Idcol1", "Col2", "Col3")
)

dfAB = dfA.alias("dfA").join(dfB.alias("dfB"), "idCol1")

Reshape:

from pyspark.sql.functions import col, struct

ids = ["Idcol1"]

vals = [struct(
    col("dfA.{}".format(c)).alias("dfA_value"),
    col("dfB.{}".format(c)).alias("dfB_value")
).alias(c) for c in dfA.columns if c not in ids]

and melt (as defined here)

(melt(dfAB.select(ids + vals), ids, [c for c in dfA.columns if c not in ids])
    .where(col("value.dfA_value") != col("value.dfB_value"))
    .select(ids + ["variable" , "value.dfA_value", "value.dfB_value"])
    .show())

+------+--------+---------+---------+                                           
|Idcol1|variable|dfA_value|dfB_value|
+------+--------+---------+---------+
|   id1|    Col3|     val3|     val4|
+------+--------+---------+---------+
Alper t. Turker
  • 34,230
  • 9
  • 83
  • 115
  • This works perfectly but I face this problem: When my original dataframe has mixed datatypes in Columns (e.g. one is Integer and others are Strings) melt function crashes with cannot resolve array exception. I currently cast all variables to String but I think this is unnecessary. Any suggestions to modify the melt function to handle all data types? Should I use another data type besides struct in melt function? – Michail N Jan 10 '18 at 12:55
  • 1
    If you want all changes in a single table then single type is required. Spark uses relational model and doesn't support union types. So if input is mixed, then you have to cast. You could try to compare data before melting to avoid problems with imprecision. – Alper t. Turker Jan 10 '18 at 12:58
  • Thank you very much this was really helpfull – Michail N Jan 10 '18 at 13:01