Suppose I have few columns as below:
EMP_ID, EMP_NAME, EMP_CONTACT
1, SIDDHESH, 544949461
Now I want to validate whether data is in sync with the column names schema. For EMP_NAME
data in that column should be string
only. I tried below code after referring this link but it shows an error on the last line of my code.
package com.sample
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Row
class sample1 {
val spark = SparkSession.builder().master("local[*]").getOrCreate()
val data = spark.read.format("csv").option("header", "true").load("C:/Users/siddheshk2/Desktop/words.txt")
val originalSchema = data.schema
def validateColumns(row: Row): Row = {
val emp_id = row.getAs[String]("EMP_ID")
val emp_name = row.getAs[String]("EMP_NAME")
val emp_contact = row.getAs[String]("EMP_CONTACT")
// do checking here and populate (err_col,err_val,err_desc) with values if applicable
Row.merge(row)
}
val validateDF = data.map { row => validateColumns(row) }
}
So, it doesn't accept the last line of my code val validateDF = data.map { row => validateColumns(row) }
. How do I solve this? Or is there any other effective way of solving my problem?
I entered an invalid record (3rd one) like below:
EMP_ID,EMP_NAME,EMP_CONTACT
1,SIDDHESH,99009809
2,asdadsa, null
sidh,sidjh,1232
In this case, I have had entered a string
value for the id
column (which should be a number) so after checking column schema with its data it should throw an error saying that the record doesn't match as per column schema.