0

I've the following two identically structurred dataframes with id in common.

val originalDF = Seq((1,"gaurav","jaipur",550,70000),(2,"sunil","noida",600,80000),(3,"rishi","ahmedabad",510,65000))
                .toDF("id","name","city","credit_score","credit_limit")
scala> originalDF.show(false)
+---+------+---------+------------+------------+
|id |name  |city     |credit_score|credit_limit|
+---+------+---------+------------+------------+
|1  |gaurav|jaipur   |550         |70000       |
|2  |sunil |noida    |600         |80000       |
|3  |rishi |ahmedabad|510         |65000       |
+---+------+---------+------------+------------+
val changedDF= Seq((1,"gaurav","jaipur",550,70000),(2,"sunil","noida",650,90000),(4,"Joshua","cochin",612,85000))
                .toDF("id","name","city","credit_score","credit_limit")
scala> changedDF.show(false)
+---+------+------+------------+------------+
|id |name  |city  |credit_score|credit_limit|
+---+------+------+------------+------------+
|1  |gaurav|jaipur|550         |70000       |
|2  |sunil |noida |650         |90000       |
|4  |Joshua|cochin|612         |85000       |
+---+------+------+------------+------------+

Hence I wrote one udf to calulate the change in column values.

val diff = udf((col: String, c1: String, c2: String) => if (c1 == c2) "" else col )
val somedf=changedDF.alias("a").join(originalDF.alias("b"), col("a.id") === col("b.id")).withColumn("diffcolumn", split(concat_ws(",",changedDF.columns.map(x => diff(lit(x), changedDF(x), originalDF(x))):_*),","))
scala> somedf.show(false)
+---+------+------+------------+------------+---+------+------+------------+------------+----------------------------------+
|id |name  |city  |credit_score|credit_limit|id |name  |city  |credit_score|credit_limit|diffcolumn                        |
+---+------+------+------------+------------+---+------+------+------------+------------+----------------------------------+
|1  |gaurav|jaipur|550         |70000       |1  |gaurav|jaipur|550         |70000       |[, , , , ]                        |
|2  |sunil |noida |650         |90000       |2  |sunil |noida |600         |80000       |[, , , credit_score, credit_limit]|
+---+------+------+------------+------------+---+------+------+------------+------------+----------------------------------+

But I'm not able to get id and diffcolumn separately. If I do a somedf.select('id) it gives me ambiguity error coz there are two ids in the joined table I want to get all the name of the columns in any array and id corresponding to which the values have changed. Like in the changedDF credit score and credit limit of id=2,name=sunil has been changed. Hence I wanted the resultant dataframe to give me result like

+--+---+------+------+------------+------------+---+
|id   | diffcolumn                        |         
+---+------+------+------------+------------+---
|2   |[, , , credit_score, credit_limit] |
+---+------+------+------------+------------+---+

Can anyone suggest me what approach to follow to get eh id and changed column separately in a dataframe.

3 Answers3

1

For your reference, these kinds of diffs can easily be done with the spark-extension package. It provides the diff transformation that builds that complex query for you:

import uk.co.gresearch.spark.diff._

val options = DiffOptions.default.withChangeColumn("changes")  // needed to get the optional 'changes' column
val diff = originalDF.diff(changedDF, options, "id")

diff.show(false)
+----+----------------------------+---+---------+----------+---------+----------+-----------------+------------------+-----------------+------------------+
|diff|changes                     |id |left_name|right_name|left_city|right_city|left_credit_score|right_credit_score|left_credit_limit|right_credit_limit|
+----+----------------------------+---+---------+----------+---------+----------+-----------------+------------------+-----------------+------------------+
|N   |[]                          |1  |gaurav   |gaurav    |jaipur   |jaipur    |550              |550               |70000            |70000             |
|I   |null                        |4  |null     |Joshua    |null     |cochin    |null             |612               |null             |85000             |
|C   |[credit_score, credit_limit]|2  |sunil    |sunil     |noida    |noida     |600              |650               |80000            |90000             |
|D   |null                        |3  |rishi    |null      |ahmedabad|null      |510              |null              |65000            |null              |
+----+----------------------------+---+---------+----------+---------+----------+-----------------+------------------+-----------------+------------------+

diff.select($"id", $"diff", $"changes").show(false)
+---+----+----------------------------+
|id |diff|changes                     |
+---+----+----------------------------+
|1  |N   |[]                          |
|4  |I   |null                        |
|2  |C   |[credit_score, credit_limit]|
|3  |D   |null                        |
+---+----+----------------------------+

While this is a simple example, diffing DataFrames can become complicated when wide schemas and null values are involved. That package is well-tested, so you don't have to worry about getting that query right yourself.

EnricoM
  • 321
  • 3
  • 3
0

Try this :

    val aliasedChangedDF = changedDF.as("a")
    val aliasedOriginalDF = originalDF.as("b")
    val diff = udf((col: String, c1: String, c2: String) => if (c1 == c2) "" else col )
    val somedf=aliasedChangedDF.join(aliasedOriginalDF, col("a.id") === col("b.id")).withColumn("diffcolumn", split(concat_ws(",",changedDF.columns.map(x => diff(lit(x), changedDF(x), originalDF(x))):_*),","))
    somedf.select(col("a.id").as("id"),col("diffcolumn"))
Suhas NM
  • 960
  • 7
  • 10
0

Just change your join condition from col("a.id") === col("b.id") to "id"

Then, there will be only a single id column.

Further, you don't need the alias("a") and alias("b"). So your join simplifies from

changedDF.alias("a").join(originalDF.alias("b"), col("a.id") === col("b.id"))

to

changedDF.join(originalDF, "id")
EnricoM
  • 321
  • 3
  • 3