19

I am trying to write some test cases to validate the data between source (.csv) file and target (hive table). One of the validation is the Structure validation of the table.

I have load the .csv data (using a defined schema) into one dataframe and extracted the hive table data into another dataframe.
When I now try to compare the schema of the two dataframes, it returns false. Not sure why. Any idea on this please?

source dataframe schema:

scala> res39.printSchema
root
 |-- datetime: timestamp (nullable = true)
 |-- load_datetime: timestamp (nullable = true)
 |-- source_bank: string (nullable = true)
 |-- emp_name: string (nullable = true)
 |-- header_row_count: integer (nullable = true)
 |-- emp_hours: double (nullable = true)

target dataframe schema:

scala> targetRawData.printSchema
root
 |-- datetime: timestamp (nullable = true)
 |-- load_datetime: timestamp (nullable = true)
 |-- source_bank: string (nullable = true)
 |-- emp_name: string (nullable = true)
 |-- header_row_count: integer (nullable = true)
 |-- emp_hours: double (nullable = true)

When I compare, it returns false:

scala> res39.schema == targetRawData.schema
res47: Boolean = false

Data in the two dataframes is shown below:

scala> res39.show
+-------------------+-------------------+-----------+--------+----------------+---------+
|           datetime|      load_datetime|source_bank|emp_name|header_row_count|emp_hours|
+-------------------+-------------------+-----------+--------+----------------+---------+
|2017-01-01 01:02:03|2017-01-01 01:02:03|        RBS| Naveen |             100|    15.23|
|2017-03-15 01:02:03|2017-03-15 01:02:03|        RBS| Naveen |             100|   115.78|
|2015-04-02 23:24:25|2015-04-02 23:24:25|        RBS|   Arun |             200|     2.09|
|2010-05-28 12:13:14|2010-05-28 12:13:14|        RBS|   Arun |             100|    30.98|
|2018-06-04 10:11:12|2018-06-04 10:11:12|        XZX|   Arun |             400|     12.0|
+-------------------+-------------------+-----------+--------+----------------+---------+


scala> targetRawData.show
+-------------------+-------------------+-----------+--------+----------------+---------+
|           datetime|      load_datetime|source_bank|emp_name|header_row_count|emp_hours|
+-------------------+-------------------+-----------+--------+----------------+---------+
|2017-01-01 01:02:03|2017-01-01 01:02:03|        RBS|  Naveen|             100|    15.23|
|2017-03-15 01:02:03|2017-03-15 01:02:03|        RBS|  Naveen|             100|   115.78|
|2015-04-02 23:25:25|2015-04-02 23:25:25|        RBS|    Arun|             200|     2.09|
|2010-05-28 12:13:14|2010-05-28 12:13:14|        RBS|    Arun|             100|    30.98|
+-------------------+-------------------+-----------+--------+----------------+---------+

The complete code looks like below:

//import org.apache.spark
import org.apache.spark.sql.hive._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions.{to_date, to_timestamp}
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.SparkSession
import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.text._
import java.util.Date
import scala.util._
import org.apache.spark.sql.hive.HiveContext

  //val conf = new SparkConf().setAppName("Simple Application")
  //val sc = new SparkContext(conf)
  val hc = new HiveContext(sc)
  val spark: SparkSession = SparkSession.builder().appName("Simple Application").config("spark.master", "local").getOrCreate()

   // set source and target location
    val sourceDataLocation = "hdfs://localhost:9000/source.txt"
    val targetTableName = "TableA"

    // Extract source data
    println("Extracting SAS source data from csv file location " + sourceDataLocation);
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    val sourceRawCsvData = sc.textFile(sourceDataLocation)

    println("Extracting target data from hive table " + targetTableName)
    val targetRawData = hc.sql("Select datetime,load_datetime,trim(source_bank) as source_bank,trim(emp_name) as emp_name,header_row_count, emp_hours from " + targetTableName)


    // Add the test cases here
    // Test 2 - Validate the Structure
       val headerColumns = sourceRawCsvData.first().split(",").to[List]
       val schema = TableASchema(headerColumns)

       val data = sourceRawCsvData.mapPartitionsWithIndex((index, element) => if (index == 0) element.drop(1) else element)
       .map(_.split(",").toList)
       .map(row)

       val dataFrame = spark.createDataFrame(data,schema)
       val sourceDataFrame = dataFrame.toDF(dataFrame.columns map(_.toLowerCase): _*)
       data.collect
       data.getClass
    // Test 3 - Validate the data
    // Test 4 - Calculate the average and variance of Int or Dec columns
    // Test 5 - Test 5

  def UpdateResult(tableName: String, returnCode: Int, description: String){
    val insertString = "INSERT INTO TestResult VALUES('" + tableName + "', " + returnCode + ",'" + description + "')"
    val a = hc.sql(insertString)

    }


  def TableASchema(columnName: List[String]): StructType = {
    StructType(
      Seq(
        StructField(name = "datetime", dataType = TimestampType, nullable = true),
        StructField(name = "load_datetime", dataType = TimestampType, nullable = true),
        StructField(name = "source_bank", dataType = StringType, nullable = true),
        StructField(name = "emp_name", dataType = StringType, nullable = true),
        StructField(name = "header_row_count", dataType = IntegerType, nullable = true),
        StructField(name = "emp_hours", dataType = DoubleType, nullable = true)
        )
    )
  }

  def row(line: List[String]): Row = {
       Row(convertToTimestamp(line(0).trim), convertToTimestamp(line(1).trim), line(2).trim, line(3).trim, line(4).toInt, line(5).toDouble)
    }


  def convertToTimestamp(s: String) : Timestamp = s match {
     case "" => null
     case _ => {
        val format = new SimpleDateFormat("ddMMMyyyy:HH:mm:ss")
        Try(new Timestamp(format.parse(s).getTime)) match {
        case Success(t) => t
        case Failure(_) => null
      }
    }
  }

  }
Shaido
  • 27,497
  • 23
  • 70
  • 73
Arun
  • 651
  • 2
  • 7
  • 21
  • I think you are comparing the object references instead of the values. Write your implementation of comparing – bigbounty Dec 18 '17 at 06:13
  • I am new to spark, please can you suggest how can I do the structure validation? I want to compare the column name, their datatypes, and nullable field. – Arun Dec 18 '17 at 06:28
  • 1
    `res39.schema == targetRawData.schema` should work for what you want to do. There must be something else wrong. Did you use the predefined schema when reading both dataframes? – Shaido Dec 18 '17 at 06:30
  • @Shaido = For reading csv table, I used my defined schema .. for reading hive table, i didn't use any schema but just `select * from tablea` query . – Arun Dec 18 '17 at 06:44
  • @Arun: An idea for testing, try selecting a column at the time from both dataframes and compare the schemas to find if all or only a single one mismatch. E.g. `res39.select("datetime").schema == targetRawData.select("datetime").schema` – Shaido Dec 18 '17 at 06:47
  • the individual column schema comparison also returns false :( – Arun Dec 18 '17 at 07:12
  • I have added the complete code in the comment if that is of any help. – Arun Dec 18 '17 at 11:41
  • How about a solution in PySpark too? – petroslamb Mar 11 '20 at 15:10

6 Answers6

14

Based on @Derek Kaknes's answer, here's the solution I came up with for comparing schemas, being concerned only about column name, datatype & nullability and indifferent to metadata

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types.{DataType, StructField}

def getCleanedSchema(df: DataFrame): Map[String, (DataType, Boolean)] = {
    df.schema.map { (structField: StructField) =>
      structField.name.toLowerCase -> (structField.dataType, structField.nullable)
    }.toMap
  }

// Compare relevant information
def getSchemaDifference(schema1: Map[String, (DataType, Boolean)],
                        schema2: Map[String, (DataType, Boolean)]
                       ): Map[String, (Option[(DataType, Boolean)], Option[(DataType, Boolean)])] = {
  (schema1.keys ++ schema2.keys).
    map(_.toLowerCase).
    toList.distinct.
    flatMap { (columnName: String) =>
      val schema1FieldOpt: Option[(DataType, Boolean)] = schema1.get(columnName)
      val schema2FieldOpt: Option[(DataType, Boolean)] = schema2.get(columnName)

      if (schema1FieldOpt == schema2FieldOpt) None
      else Some(columnName -> (schema1FieldOpt, schema2FieldOpt))
    }.toMap
}
  • getCleanedSchema method extracts information of interest - column datatype & nullability and returns a map of column name to tuple

  • getSchemaDifference method returns a map containing only those columns that differ in the two schemas. If a column is absent in one of the two schemas, then it's corresponding properties would be None

iamtodor
  • 734
  • 8
  • 21
y2k-shubham
  • 10,183
  • 11
  • 55
  • 131
  • 1
    This works fine for flat structure schema but Can we make it work for the nested schema which we get when you read json file. – Nikunj Kakadiya May 18 '22 at 07:29
8

I've just had the exact same problem. When you read data from Hive the schema's StructField component will sometimes contain Hive metadata in the field metadata. You can't see it when printing the schemas because this field is not part of the toString definition.

Here is the solution I've decided to use, I just get a copy of the schema with an empty Metadata before comparing it :

schema.map(_.copy(metadata = Metadata.empty))
cheseaux
  • 5,187
  • 31
  • 50
  • 1
    Nice one. It could be worth noting that this unwraps the `StructType` from around the `Seq[StructField]`. – kfkhalili Jul 25 '19 at 15:08
5

I've had this issue before and it was caused by differences in the StructField.metadata attribute. It is almost impossible to identify this out of the box, as simple inspection of the StructField's will only show the name, datatype and nullable values. My suggestion to debug it would be to compare the metadata of your fields. Something like this maybe:

res39.schema.zip(targetRawData.schema).foreach{ case (r: StructField, t: StructField) => 
  println(s"Field: ${r.name}\n--| res_meta: ${r.metadata}\n--|target_meta: ${t.metadata}")}

If you want to compare schemas but ignore metadata, then I don't have a great solution. The best that I've been able to come up with is to iterate over the StructFields and manually remove the metadata, then create a temporary copy of the dataframe without metadata. So you can do something like this (assuming that df is the dataframe you want to strip of of metadata):

val schemaWithoutMetadata = StructType(df.schema.map{ case f: StructField => 
  StructField(f.name, f.dataType, f.nullable)
})
val tmpDF = spark.sqlContext.createDataFrame(df.rdd, schemaWithoutMetadata)

Then you can either compare the dataframes directly or compare the schemas the way you have been attempting. I assume that this solution would not be performant, so should only be used on small datasets.

Derek Kaknes
  • 961
  • 8
  • 10
5

Option 1 - StructField.toString

Here is another solution based on observation that the string representation of name + DataType + nullable is unique for each column. As seen here the toString implementation of StructField already supports that rule therefore we can directly use it to compare the columns of different schemas:

import org.apache.spark.sql.types.{StructType, StructField}

val schemaDiff = (s1 :StructType, s2 :StructType) => {
      val s1Keys = s1.map{_.toString}.toSet
      val s2Keys = s2.map{_.toString}.toSet
      val commonKeys =  s1Keys.intersect(s2Keys)

      val diffKeys = s1Keys ++ s2Keys -- commonKeys

      (s1 ++ s2).filter(sf => diffKeys.contains(sf.toString)).toList
}

Notice that the field name is case sensitive hence different column names imply different columns.

The steps:

  1. For each schema generate a set of keys where each key has the format StructField($name,$dataType,$nullable)
  2. Get intersection of keys
  3. Subtract intersection from union of keys, that will give us the keys difference (diffKeys)
  4. Finally, from both schemas get only the elements that their string representation exists in diffKeys

Option 2 - case class, eq, ==

StructField and StructType are both case classes therefore we expect that the eq method and == operator are both based on a hash generated from the values of their members. You can confirm that by applying the change that @cheseaux pointed out, for example:

val s1 = StructType(res39.schema.map(_.copy(metadata = Metadata.empty)))
val s2 = StructType(targetRawData.schema.map(_.copy(metadata = Metadata.empty)))

s1 == s2 // true 

Which is expected since == can be applied between two lists of case classes and returns true only if both lists contain identical items. In the previous case == operator has been applied between two StructType objects and consequently between two Seq[StructField] objects as we can see in the constructor definition. As already discussed, the comparison in your case was failing since the value of the metadata differed between schemas.

Attention, the == operator is not safe between schemas if we modify the order of the columns. That is because the list implementation of == considers the order of the items as well. To overcome that obstacle we can safely cast the collection into a set with toSet as we shown above.

Finally, we can take advantage of the above observations and rewrite the first version into the next one:

val schemaDiff = (s1 :StructType, s2 :StructType) => {
      val s1Set = s1.map(_.copy(metadata = Metadata.empty)).toSet
      val s2Set = s2.map(_.copy(metadata = Metadata.empty)).toSet
      val commonItems =  s1Set.intersect(s2Set)

      (s1Set ++ s2Set -- commonItems).toList
}

The performance drawback of the 2nd option is that we need to recreate the StructField item by setting metadata = Metadata.empty.

abiratsis
  • 7,051
  • 3
  • 28
  • 46
1
val csDf = res39       // any source dataframe
val myDf = targetRawData   // target data frame

val csFields = csDf.schema.fields
val myFields = myDf.schema.fields

val csFieldNameTypeMap = csFields.map(f => f.name -> f.dataType).toMap
val myFieldNameTypemap = myFields.map(f => f.name->f.dataType).toMap

val diffFields = csFields.filter(f =>  csFieldNameTypeMap.get(f.name) != myFieldNameTypemap.get(f.name) ).toList
val diffFieldsMyDf = myFields.filter(f =>  csFieldNameTypeMap.get(f.name) != myFieldNameTypemap.get(f.name) ).toList

'diffFields' and 'diffFieldsMyDf' will give you the fields which have different data type than the other. Similar steps can be done for checking 'nullable' also, just replace 'dataType' with 'nullable'

-2

This is a Java level Object comparison problem, you should try with .equals(). This mostly works unless different SourceTypes introduce metadata, nullability issues.

aasthetic
  • 327
  • 4
  • 21