24

Spark dataframe 1 -:

+------+-------+---------+----+---+-------+
|city  |product|date     |sale|exp|wastage|
+------+-------+---------+----+---+-------+
|city 1|prod 1 |9/29/2017|358 |975|193    |
|city 1|prod 2 |8/25/2017|50  |687|201    |
|city 1|prod 3 |9/9/2017 |236 |431|169    |
|city 2|prod 1 |9/28/2017|358 |975|193    |
|city 2|prod 2 |8/24/2017|50  |687|201    |
|city 3|prod 3 |9/8/2017 |236 |431|169    |
+------+-------+---------+----+---+-------+

Spark dataframe 2 -:

+------+-------+---------+----+---+-------+
|city  |product|date     |sale|exp|wastage|
+------+-------+---------+----+---+-------+
|city 1|prod 1 |9/29/2017|358 |975|193    |
|city 1|prod 2 |8/25/2017|50  |687|201    |
|city 1|prod 3 |9/9/2017 |230 |430|160    |
|city 1|prod 4 |9/27/2017|350 |90 |190    |
|city 2|prod 2 |8/24/2017|50  |687|201    |
|city 3|prod 3 |9/8/2017 |236 |431|169    |
|city 3|prod 4 |9/18/2017|230 |431|169    |
+------+-------+---------+----+---+-------+

Please find out spark dataframe for following conditions applied on above given spark dataframe 1 and spark dataframe 2,

  1. Deleted Records
  2. New Records
  3. Records with no changes
  4. Records with changes

    Here key of comprision are 'city', 'product', 'date'.

we need solution without using Spark SQL.

zero323
  • 322,348
  • 103
  • 959
  • 935
prakash
  • 419
  • 2
  • 4
  • 13
  • 1
    This is for anyone that stumbles onto this problem and needs more. I have found the `except` doesn't always provide me with everything so I have made a library, one part of this library is a dataset comparison https://github.com/AbsaOSS/hermes – Saša Zejnilović Feb 13 '20 at 07:50

8 Answers8

37

I am not sure about finding the deleted and modified records but you can use exceptAll function to get the difference

df2.exceptAll(df1)

This returns the rows that has been added or modified in dataframe2 or record with changes. Output:

+------+-------+---------+----+---+-------+
|  city|product|     date|sale|exp|wastage|
+------+-------+---------+----+---+-------+
|city 3| prod 4|9/18/2017| 230|431|    169|
|city 1| prod 4|9/27/2017| 350| 90|    190|
|city 1| prod 3|9/9/2017 | 230|430|    160|
+------+-------+---------+----+---+-------+

You can also try with join and filter to get the changed and unchanged data as

df1.join(df2, Seq("city","product", "date"), "left").show(false)
df1.join(df2, Seq("city","product", "date"), "right").show(false)

Hope this helps!

The Singularity
  • 2,428
  • 3
  • 19
  • 48
koiralo
  • 22,594
  • 6
  • 51
  • 72
14

A scalable and easy way is to diff the two DataFrames with spark-extension:

import uk.co.gresearch.spark.diff._

df1.diff(df2, "city", "product", "date").show

+----+------+-------+----------+---------+----------+--------+---------+------------+-------------+
|diff|  city|product|      date|left_sale|right_sale|left_exp|right_exp|left_wastage|right_wastage|
+----+------+-------+----------+---------+----------+--------+---------+------------+-------------+
|   N|city 1|prod 2 |2017-08-25|       50|        50|     687|      687|         201|          201|
|   C|city 1|prod 3 |2017-09-09|      236|       230|     431|      430|         169|          160|
|   I|city 3|prod 4 |2017-09-18|     null|       230|    null|      431|        null|          169|
|   N|city 3|prod 3 |2017-09-08|      236|       236|     431|      431|         169|          169|
|   D|city 2|prod 1 |2017-09-28|      358|      null|     975|     null|         193|         null|
|   I|city 1|prod 4 |2017-09-27|     null|       350|    null|       90|        null|          190|
|   N|city 1|prod 1 |2017-09-29|      358|       358|     975|      975|         193|          193|
|   N|city 2|prod 2 |2017-08-24|       50|        50|     687|      687|         201|          201|
+----+------+-------+----------+---------+----------+--------+---------+------------+-------------+

It identifies Inserted, Changed, Deleted and uN-changed rows.

EnricoM
  • 321
  • 3
  • 3
2

Check out MegaSparkDiff its an open source project on GitHub that helps compare dataframes .. the project is not yet published in maven central but you can look at the SparkCompare scala class that compares 2 dataframes

the below code snippet will give you 2 dataframes one has rows inLeftButNotInRight and another one having InRightButNotInLeft.

if you do a JOIN between both then you can apply some logic to identify the missing primary keys (where possible) and then those keys would constitute the deleted records.

We are working on adding the use case that you are looking for in the project. https://github.com/FINRAOS/MegaSparkDiff

https://github.com/FINRAOS/MegaSparkDiff/blob/master/src/main/scala/org/finra/msd/sparkcompare/SparkCompare.scala

private def compareSchemaDataFrames(left: DataFrame , leftViewName: String
                              , right: DataFrame , rightViewName: String) :Pair[DataFrame, DataFrame] = {
    //make sure that column names match in both dataFrames
    if (!left.columns.sameElements(right.columns))
      {
        println("column names were different")
        throw new Exception("Column Names Did Not Match")
      }

    val leftCols = left.columns.mkString(",")
    val rightCols = right.columns.mkString(",")

    //group by all columns in both data frames
    val groupedLeft = left.sqlContext.sql("select " + leftCols + " , count(*) as recordRepeatCount from " +  leftViewName + " group by " + leftCols )
    val groupedRight = left.sqlContext.sql("select " + rightCols + " , count(*) as recordRepeatCount from " +  rightViewName + " group by " + rightCols )

    //do the except/subtract command
    val inLnotinR = groupedLeft.except(groupedRight).toDF()
    val inRnotinL = groupedRight.except(groupedLeft).toDF()

    return new ImmutablePair[DataFrame, DataFrame](inLnotinR, inRnotinL)
  }
2

see below the utility function I used to compare two dataframes using the following criteria

  1. Column length
  2. Record count
  3. Column by column comparing for all records

Task three is done by using a hash of concatenation of all columns in a record.

def verifyMatchAndSaveSignatureDifferences(oldDF: DataFrame, newDF: DataFrame, pkColumn: String) : Long = {
  assert(oldDF.columns.length == newDF.columns.length, s"column lengths don't match")
  assert(oldDF.count == newDF.count, s"record count don't match")

  def createHashColumn(df: DataFrame) : Column = {
     val colArr = df.columns
     md5(concat_ws("", (colArr.map(col(_))) : _*))
  }

  val newSigDF = newDF.select(col(pkColumn), createHashColumn(newDF).as("signature_new"))
  val oldSigDF = oldDF.select(col(pkColumn), createHashColumn(oldDF).as("signature"))

  val joinDF = newSigDF.join(oldSigDF, newSigDF("pkColumn") === oldSigDF("pkColumn")).where($"signature" !== $"signature_new").cache

  val diff = joinDF.count
  //write out any recorsd that don't match
  if (diff > 0)
     joinDF.write.saveAsTable("signature_table")

  joinDF.unpersist()

  diff
}

If the method returns 0, then both dataframes are exactly the same in everything else, a table named signature_table in default schema of hive will contains all records that differ in both.

Hope this helps.

okmich
  • 700
  • 5
  • 11
1

Spark version: 2.2.0

Use both except and left anti join

df2.except(df1) will be like:

city product date sale exp wastage
city 3 prod 4 9/18/2017 230 431 169
city 1 prod 4 9/27/2017 350 90 190
city 1 prod 3 9/9/2017 230 430 160

just as koiralo said, but the deleted item 'city 2 prod 1' is lost, so we need left anti join(or left join with filters):

select * from df1 left anti join df2 on df1.city=df2.city and df1.product=df2.product

then union the results of df2.except(df1) and left anti join

But I didn't test the performance of left anti join on large dataset

PS: If your spark version is over 2.4, using spark-extension will be easier

vince
  • 176
  • 1
  • 4
1

I just discovered a wonderful package for pyspark that compares two dataframes. The name of the package is datacompy

https://capitalone.github.io/datacompy/

example code:

import datacompy as dc

comparison = dc.SparkCompare(spark, base_df=df1, compare_df=df2, join_columns=common_keys, match_rates=True)
comparison.report()

The above code will generate a summary report, and the one below it will give you the mismatches.

comparison.rows_both_mismatch.display()

There are also more fearures that you can explore.

George Sotiropoulos
  • 1,864
  • 1
  • 22
  • 32
0

Using Spark different join types seems to be the key to computing Deletions, Additions, and Updates on rows.

This question illustrates the different types of joins depending on what you are trying to achieve: What are the various join types in Spark?

Michel Hua
  • 1,614
  • 2
  • 23
  • 44
0

Let's say we have two DataFrames, z1 and z1. Option 1 is good for rows without duplicates. You can try these in spark-shell.

  • Option 1: do except directly

val inZ1NotInZ2 = z1.except(z2).toDF()
val inZ2NotInZ1 = z2.except(z1).toDF()

inZ1NotInZ2.show
inZ2NotInZ1.show
  • Option 2: use GroupBy(for DataFrame with duplicate rows)
val z1Grouped = z1.groupBy(z1.columns.map(c => z1(c)).toSeq : _*).count().withColumnRenamed("count", "recordRepeatCount")
val z2Grouped = z2.groupBy(z2.columns.map(c => z2(c)).toSeq : _*).count().withColumnRenamed("count", "recordRepeatCount")

val inZ1NotInZ2 = z1Grouped.except(z2Grouped).toDF()
val inZ2NotInZ1 = z2Grouped.except(z1Grouped).toDF()

  • Option 3, use exceptAll, which should also work for data with duplicate rows
// Source Code: https://github.com/apache/spark/blob/50538600ec/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L2029
val inZ1NotInZ2 = z1.exceptAll(z2).toDF()
val inZ2NotInZ1 = z2.exceptAll(z1).toDF()

LeOn - Han Li
  • 9,388
  • 1
  • 65
  • 59