1

I have two datasets df1 and df2 where I need to detect any record that is different in df2 compare to df1 and create a resulting dataset with an additional column that flags the records that are different. Here is an example.

package playground

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, sum}

object sample4 {

  val spark = SparkSession
    .builder()
    .appName("Sample app")
    .master("local")
    .getOrCreate()

  val sc = spark.sparkContext

  final case class Owner(a: Long,
                         b: String,
                         c: Long,
                         d: Short,
                         e: String,
                         f: String,
                         o_qtty: Double)

  final case class Result(a: Long,
                          b: String,
                          c: Long,
                          d: Short,
                          e: String,
                          f: String,
                          o_qtty: Double,
                          isDiff: Boolean)

  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.OFF)

    import spark.implicits._

    val data1 = Seq(
      Owner(11, "A", 666, 2017, "x", "y", 50),
      Owner(11, "A", 222, 2018, "x", "y", 20),
      Owner(33, "C", 444, 2018, "x", "y", 20),
      Owner(33, "C", 555, 2018, "x", "y", 120),
      Owner(22, "B", 555, 2018, "x", "y", 20),
      Owner(99, "D", 888, 2018, "x", "y", 100),
      Owner(11, "A", 888, 2018, "x", "y", 100),
      Owner(11, "A", 666, 2018, "x", "y", 80),
      Owner(33, "C", 666, 2018, "x", "y", 80),
      Owner(11, "A", 444, 2018, "x", "y", 50),
    )

    val data2 = Seq(
      Owner(11, "A", 666, 2017, "x", "y", 50),
      Owner(11, "A", 222, 2018, "x", "y", 20),
      Owner(33, "C", 444, 2018, "x", "y", 20),
      Owner(33, "C", 555, 2018, "x", "y", 55),
      Owner(22, "B", 555, 2018, "x", "y", 20),
      Owner(99, "D", 888, 2018, "x", "y", 100),
      Owner(11, "A", 888, 2018, "x", "y", 100),
      Owner(11, "A", 666, 2018, "x", "y", 80),
      Owner(33, "C", 666, 2018, "x", "y", 80),
      Owner(11, "A", 444, 2018, "x", "y", 50),
    )

    val expected = Seq(
      Result(11, "A", 666, 2017, "x", "y", 50, isDiff = false),
      Result(11, "A", 222, 2018, "x", "y", 20, isDiff = false),
      Result(33, "C", 444, 2018, "x", "y", 20, isDiff = false),
      Result(33, "C", 555, 2018, "x", "y", 55, isDiff = true),
      Result(22, "B", 555, 2018, "x", "y", 20, isDiff = false),
      Result(99, "D", 888, 2018, "x", "y", 100, isDiff = false),
      Result(11, "A", 888, 2018, "x", "y", 100, isDiff = false),
      Result(11, "A", 666, 2018, "x", "y", 80, isDiff = false),
      Result(33, "C", 666, 2018, "x", "y", 80, isDiff = false),
      Result(11, "A", 444, 2018, "x", "y", 50, isDiff = false),
    )


    val df1 = spark
      .createDataset(data1)
      .as[Owner]
      .cache()

    val df2 = spark
      .createDataset(data2)
      .as[Owner]
      .cache()
  }

}

What is the most efficient way to do that?

Michael
  • 2,436
  • 1
  • 36
  • 57
  • Does this answer your question? [Compare two Spark dataframes](https://stackoverflow.com/questions/45553300/compare-two-spark-dataframes) – Vitaliy Jul 26 '20 at 05:47
  • Not really, this question doesn't consider the flag to isolate the record being different. – Michael Jul 26 '20 at 07:06
  • I think the 2 answers down are not efficient, because `join` and `intersect` create hash table for all records and all partitions and compare its all. At least you can try the simplest solution: `df1.rdd.zip(df2.rdd).map {case (x,y) => (x, x != y)}` and compare speed on real dataset. PS: it is good idea to replace one-char string to char, because char comparison is very fast – Mikhail Ionkin Jul 26 '20 at 10:16
  • @MikhailIonkin can you provide a full answer so I can accept your answer. You are making a good point here! – Michael Jul 31 '20 at 10:32
  • done it. I have not real dataset, so I can not prove that my answer is faster, but I think it is according to test on small dataset and according to documentation – Mikhail Ionkin Jul 31 '20 at 16:18

3 Answers3

1

I think this code could help you to find your answer:

val intersectDF=df1.intersect(df2)
val unionDF=df1.union(df2).dropDuplicates()
val diffDF= unionDF.except(intersectDF)

val intersectDF2=intersectDF.withColumn("isDiff",functions.lit(false))
val diffDF2=diffDF.withColumn("isDiff",functions.lit(true))
val answer=intersectDF2.union(diffDF2)

//Common data between two DataFrame
intersectDF2.show()
//Difference data between two DataFrame
diffDF2.show()
//Your answer
answer.show()
1

Perhaps this is helpful -

Do a left join and identify un-matched columns as false

 val df1_hash = df1.withColumn("x", lit(0))
    df2.join(df1_hash, df2.columns, "left")
      .select(when(col("x").isNull, false).otherwise(true).as("isDiff") +: df2.columns.map(df2(_)): _*)
      .show(false)

    /**
      * +------+---+---+---+----+---+---+------+
      * |isDiff|a  |b  |c  |d   |e  |f  |o_qtty|
      * +------+---+---+---+----+---+---+------+
      * |true  |11 |A  |666|2017|x  |y  |50.0  |
      * |true  |11 |A  |222|2018|x  |y  |20.0  |
      * |true  |33 |C  |444|2018|x  |y  |20.0  |
      * |false |33 |C  |555|2018|x  |y  |55.0  |
      * |true  |22 |B  |555|2018|x  |y  |20.0  |
      * |true  |99 |D  |888|2018|x  |y  |100.0 |
      * |true  |11 |A  |888|2018|x  |y  |100.0 |
      * |true  |11 |A  |666|2018|x  |y  |80.0  |
      * |true  |33 |C  |666|2018|x  |y  |80.0  |
      * |true  |11 |A  |444|2018|x  |y  |50.0  |
      * +------+---+---+---+----+---+---+------+
      */
Som
  • 6,193
  • 1
  • 11
  • 22
0

I think the 2 other answers are not efficient, because join and intersect create hash table for all records and all partitions and compare its all. At least you can try the simplest solution:

  df1.rdd.zip(df2.rdd).map {
    case (x,y) => (x, x != y)
  }

and compare speed on real dataset.

Also it is good idea to replace one-char string to char, because char comparison is very fast.

I have not real dataset, so I can not prove that my answer is faster, but I think it is according to test on small dataset and according to documentation and reasization of join and intersect. zip does not exchange of partitions versus join or intersect. Sorry for my English

Mikhail Ionkin
  • 568
  • 4
  • 20