6

I'm new to Scala and am having problems writing unit tests.

I'm trying to compare and check equality for two Spark DataFrames in Scala for unit testing, and realized that there is no easy way to check equality for two Spark DataFrames.

The C++ equivalent code would be (assuming that the DataFrames are represented as double arrays in C++):

    int expected[10][2];
    int result[10][2];
    for (int row = 0; row < 10; row++) {
        for (int col = 0; col < 2; col++) {
            if (expected[row][col] != result[row][col]) return false;
        }
    }

The actual test would involve testing for equality based on the data types of the columns of the DataFrames (testing with precision tolerance for floats, etc).

It seems like there's not an easy way to iteratively loop over all the elements in the DataFrames using Scala and the other solutions for checking equality of two DataFrames such as df1.except(df2) do not work in my case as I need to be able to provide support for testing equality with tolerance for floats and doubles.

Of course, I could try to round all the elements beforehand and compare the results afterwards, but I would like to see if there are any other solutions that would allow me to iterate through the DataFrames to check for equality.

codeinstyle
  • 63
  • 1
  • 1
  • 4
  • How big are your dataframes ? If they are not so big, you could sort/collect them and then easily compare them. – cheseaux Nov 08 '16 at 23:02
  • Since those are unit-test data frames, those should be quite small. Just collect them into a List and compare. – sarveshseri Nov 09 '16 at 06:20
  • Yeah, my test currently collects the data frames into a list and compares them, but I was hoping to create tools that could also test on bigger data frames as well. I'm guessing that there's no easy way of accomplishing this? – codeinstyle Nov 09 '16 at 18:24
  • *** Asked 3 years, 4 months ago Active 5 months ago Viewed 7k times --- YET still no Answer accepted ... – Yordan Georgiev Apr 02 '20 at 20:43

3 Answers3

9
import org.scalatest.{BeforeAndAfterAll, FeatureSpec, Matchers}

outDf.collect() should contain theSameElementsAs (dfComparable.collect())
# or ( obs order matters ! )

// outDf.except(dfComparable).toDF().count should be(0)
outDf.except(dfComparable).count should be(0)   
Yordan Georgiev
  • 5,114
  • 1
  • 56
  • 53
  • 1
    except function already returns a Dataframe so no need for `toDF` – bogdan.rusu Oct 25 '19 at 08:45
  • `outDf.except(dfComparable).count should be(0) ` is not a good choice because `.except` returns a table with the elements from left that are not in right. If some element are missing in left the test will not fail. `assertSmallDataFrameEquality`is a better alternative see https://stackoverflow.com/questions/31197353/dataframe-equality-in-apache-spark – Theo Oct 22 '20 at 07:41
  • `assertDataFrameEquals` from spark-testing-base is also an alternative. – Theo Oct 22 '20 at 07:47
1

If you want to check if both the data frames are equal or not for testing purpose, you can make use of subtract() method of data frame (supported in version 1.3 and above)

You can check if diff of both data frames is empty or 0. e.g. df1.subtract(df2).count() == 0

Amit Kulkarni
  • 654
  • 5
  • 8
  • 1
    Thanks for the suggestion, but `df1.except(df2)` that I mentioned in my question has the same functionality as the `df1.subtract(df2)` and does not really work in this situation, where I am hoping to compare the values with precision tolerance. – codeinstyle Nov 09 '16 at 18:26
0

Assuming that you have a fixed # of col and rows, one solution could be join both Df's by row index (in case you do not have id's for the records), and then iterate direct in the final DF [with all the columns of both DF's]. Something like this:

Schemas
DF1
root
 |-- col1: double (nullable = true)
 |-- col2: double (nullable = true)
 |-- col3: double (nullable = true)

DF2
root
 |-- col1: double (nullable = true)
 |-- col2: double (nullable = true)
 |-- col3: double (nullable = true)

df1
+----------+-----------+------+
|      col1|       col2|  col3|
+----------+-----------+------+
|1.20000001|       1.21|   1.2|
|    2.1111|        2.3|  22.2|
|       3.2|2.330000001| 2.333|
|    2.2444|      2.344|2.3331|
+----------+-----------+------+

df2
+------+-----+------+
|  col1| col2|  col3|
+------+-----+------+
|   1.2| 1.21|   1.2|
|2.1111|  2.3|  22.2|
|   3.2| 2.33| 2.333|
|2.2444|2.344|2.3331|
+------+-----+------+

Added row index
df1
+----------+-----------+------+---+
|      col1|       col2|  col3|row|
+----------+-----------+------+---+
|1.20000001|       1.21|   1.2|  0|
|    2.1111|        2.3|  22.2|  1|
|       3.2|2.330000001| 2.333|  2|
|    2.2444|      2.344|2.3331|  3|
+----------+-----------+------+---+

df2
+------+-----+------+---+
|  col1| col2|  col3|row|
+------+-----+------+---+
|   1.2| 1.21|   1.2|  0|
|2.1111|  2.3|  22.2|  1|
|   3.2| 2.33| 2.333|  2|
|2.2444|2.344|2.3331|  3|
+------+-----+------+---+

Combined DF
+---+----------+-----------+------+------+-----+------+
|row|      col1|       col2|  col3|  col1| col2|  col3|
+---+----------+-----------+------+------+-----+------+
|  0|1.20000001|       1.21|   1.2|   1.2| 1.21|   1.2|
|  1|    2.1111|        2.3|  22.2|2.1111|  2.3|  22.2|
|  2|       3.2|2.330000001| 2.333|   3.2| 2.33| 2.333|
|  3|    2.2444|      2.344|2.3331|2.2444|2.344|2.3331|
+---+----------+-----------+------+------+-----+------+

This is how you can do that:

println("Schemas")
    println("DF1")
    df1.printSchema()
    println("DF2")
    df2.printSchema()
    println("df1")
    df1.show
    println("df2")
    df2.show
    val finaldf1 = df1.withColumn("row", monotonically_increasing_id())
    val finaldf2 = df2.withColumn("row", monotonically_increasing_id())
    println("Added row index")
    println("df1")
    finaldf1.show()
    println("df2")
    finaldf2.show()

    val joinedDfs = finaldf1.join(finaldf2, "row")
    println("Combined DF")
    joinedDfs.show()

    val tolerance = 0.001
    def isInValidRange(a: Double, b: Double): Boolean ={
      Math.abs(a-b)<=tolerance
    }
    joinedDfs.take(10).foreach(row => {
      assert( isInValidRange(row.getDouble(1), row.getDouble(4)) , "Col1 validation. Row %s".format(row.getLong(0)+1))
      assert( isInValidRange(row.getDouble(2), row.getDouble(5)) , "Col2 validation. Row %s".format(row.getLong(0)+1))
      assert( isInValidRange(row.getDouble(3), row.getDouble(6)) , "Col3 validation. Row %s".format(row.getLong(0)+1))
    })

Note: Assert's are not serialized, a workaround is use take() to avoid errors.