1

i have input File (CSV) containing upto 20 columns. I have to filter input file based on number of columns. If row contains 20 columns then row is considered as good data else bad data.

Input File:

123456,"ID_SYS",12,"Status_code","feedback","HIGH","D",""," ",""," ",""," 
",9999," ",2013-05-02,9999-12-31,"N",1,2

I am reading file as RDD and splitting based on , and checking if row contains 20 columns

val rdd = SparkConfig.spark.sparkContext.textFile(CommonUtils.loadConf.getString("conf.inputFile"))
val splitRDD = rdd.map(line =>Row.fromSeq(line.split(",")))
val goodRDD = splitRDD.filter(arr => arr.size == 20)

I have to convert goodRDD into Dataframe?Dataset to apply some transformations I tried with below code

val rowRdd = splitRDD.map{
                 case Array(c1,c2,c3 .... c20) => Row(c1.toInt,c2....)
                 case _ => badCount++ 
                 }
val ds = SparkConfig.spark.sqlContext.createDataFrame(rowRdd 
        ,inputFileSchema)

I have 20 columns , I hav to write down 20 columns in pattern matching? I would like to know best way for rite solution

Niketa
  • 453
  • 2
  • 9
  • 24
  • Plain CSV reader with `mode` set to `DROPMALFORMED` usually makes more sense (as per https://stackoverflow.com/questions/29704333/spark-load-csv-file-as-dataframe and https://stackoverflow.com/questions/34347448/spark-sql-loading-csv-psv-files-with-some-malformed-records) but otherwise your limited by the API (https://stackoverflow.com/questions/29383578/how-to-convert-rdd-object-to-dataframe-in-spark). The only reasonable improvement here is to replace ` Row(c1.toInt,c2....)` with `Row.fromSeq` and simply add guarding expression: `case xs if xs.size == 20 => Row.fromSeq(xs)` – zero323 Nov 12 '18 at 16:19
  • thanks for the response. I want the count of bad rows also (row conatining < 20 columns). If i use case xs if xs.size == 20 => Row.fromSeq(xs), i have to typecast some of values like c1.toLong – Niketa Nov 12 '18 at 16:56

0 Answers0