0

Suppose I have few columns as below:

EMP_ID, EMP_NAME, EMP_CONTACT  
1, SIDDHESH, 544949461

Now I want to validate whether data is in sync with the column names schema. For EMP_NAME data in that column should be string only. I tried below code after referring this link but it shows an error on the last line of my code.

package com.sample
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Row
class sample1 {
  val spark = SparkSession.builder().master("local[*]").getOrCreate()
  val data = spark.read.format("csv").option("header", "true").load("C:/Users/siddheshk2/Desktop/words.txt")
  val originalSchema = data.schema

  def validateColumns(row: Row): Row = {
    val emp_id = row.getAs[String]("EMP_ID")
    val emp_name = row.getAs[String]("EMP_NAME")
    val emp_contact = row.getAs[String]("EMP_CONTACT")

    // do checking here and populate (err_col,err_val,err_desc) with values if applicable

    Row.merge(row)
  }
  val validateDF = data.map { row => validateColumns(row) }

}  

So, it doesn't accept the last line of my code val validateDF = data.map { row => validateColumns(row) }. How do I solve this? Or is there any other effective way of solving my problem?

I entered an invalid record (3rd one) like below:

EMP_ID,EMP_NAME,EMP_CONTACT
1,SIDDHESH,99009809
2,asdadsa, null
sidh,sidjh,1232  

In this case, I have had entered a string value for the id column (which should be a number) so after checking column schema with its data it should throw an error saying that the record doesn't match as per column schema.

user4157124
  • 2,809
  • 13
  • 27
  • 42
whatsinthename
  • 1,828
  • 20
  • 59
  • You can always add a schema or infer the schema from the file when reading the data. Please check: https://stackoverflow.com/questions/39926411/provide-schema-while-reading-csv-file-as-a-dataframe – RudyVerboven Feb 27 '20 at 12:42

2 Answers2

1

You just missed to convert your DataFrame to rdd in order to apply .map operation try this instead :

import org.apache.spark.sql.Row    
val validateDF = data.rdd.map { row => validateColumns(row) }

And if you want to convert it back to DataFrame, just use your sparkSession for that:

val newSchema = // specify the schema of the new dataframe
val updatedDF = spark.createDataFrame(validateDF, newSchema)
Simon Delecourt
  • 1,519
  • 8
  • 13
1

Row class has a schema property. You can use it by iterating through the columns and comparing them. For that you can use the == operator or use one the schema comparison methods described here.

The validate method then could look as next:

def isValid(row: Row): Boolean = originalSchema == row.schema
abiratsis
  • 7,051
  • 3
  • 28
  • 46
  • How do I handle this or implement this? Check my edited post – whatsinthename Feb 28 '20 at 06:10
  • How do I merge this in my code? I am new to scala as well so, not sure. – whatsinthename Feb 28 '20 at 06:19
  • Spark will try to infer the schema from the data itself. In your example the whole EMP_ID will be converted to string since Spark sees that there is a string. This will happen if you haven't specified the schema, alternatively you can specify the schema explicitly. If you define EMP_ID as int Spark would have failed to load the data. To sum up you don't need any validation function as the one above if you specify the correct schema – abiratsis Feb 28 '20 at 08:40