0

Would you be able to help in this spark prob statement

Data -

empno|ename|designation|manager|hire_date|sal|deptno    
7369|SMITH|CLERK|9902|2010-12-17|800.00|20
7499|ALLEN|SALESMAN|9698|2011-02-20|1600.00|30

Code:

val rawrdd = spark.sparkContext.textFile("C:\\Users\\cmohamma\\data\\delta scenarios\\emp_20191010.txt")

val refinedRDD = rawrdd.map( lines => {   
val fields = lines.split("\\|")   (fields(0).toInt,fields(1),fields(2),fields(3).toInt,fields(4).toDate,fields(5).toFloat,fields(6).toInt)  
})

Problem Statement - This is not working -fields(4).toDate , whats is the alternative or what is the usage ?

What i have tried ?

  1. tried replacing it to - to_date(col(fields(4)) , "yyy-MM-dd") - Not working

2.

Step 1.

val refinedRDD = rawrdd.map( lines => {   
val fields = lines.split("\\|")    
(fields(0),fields(1),fields(2),fields(3),fields(4),fields(5),fields(6))
})

Now this tuples are all strings

Step 2.

mySchema = StructType(StructField(empno,IntegerType,true), StructField(ename,StringType,true), StructField(designation,StringType,true), StructField(manager,IntegerType,true), StructField(hire_date,DateType,true), StructField(sal,DoubleType,true), StructField(deptno,IntegerType,true))

Step 3. converting the string tuples to Rows

val rowRDD = refinedRDD.map(attributes => Row(attributes._1, attributes._2, attributes._3, attributes._4, attributes._5 , attributes._6, attributes._7))

Step 4.

val empDF = spark.createDataFrame(rowRDD, mySchema)

This is also not working and gives error related to types. to solve this i changed the step 1 as

(fields(0).toInt,fields(1),fields(2),fields(3).toInt,fields(4),fields(5).toFloat,fields(6).toInt)

Now this is giving error for the date type column and i am again at the main problem.

Use Case - use textFile Api, convert this to a dataframe using custom schema (StructType) on top of it.

This can be done using the case class but in case class also i would be stuck where i would need to do a fields(4).toDate (i know i can cast string to date later in code but if the above problem solutionis possible)

koiralo
  • 22,594
  • 6
  • 51
  • 72
  • Why don't you simply read as csv, with inferSchema or providing custom schema? `val df = spark.read .option("delimiter", "\\|") .option("header", true) .option("inferSchema", "true") .csv(path)` Should be Should be enough to read a dataframe. – koiralo Nov 14 '19 at 15:35
  • @ShankarKoirala because the file is not a csv, it is a .dat file with delimiter pipe, I am creating custom schema in step 2 and getting error in step 3 when i am converting the tuples to rows to create a dataframe using that rdd of rows. Do you know a way to attach a custom schema to an rdd i order to create a dataframe ? – Chand Mohammad Nov 19 '19 at 18:13

2 Answers2

0

You can use the following code snippet

import org.apache.spark.sql.functions.to_timestamp

scala> val df = spark.read.format("csv").option("header", "true").option("delimiter", "|").load("gs://otif-etl-input/test.csv")
df: org.apache.spark.sql.DataFrame = [empno: string, ename: string ... 5 more fields]

scala> val ts = to_timestamp($"hire_date", "yyyy-MM-dd")
ts: org.apache.spark.sql.Column = to_timestamp(`hire_date`, 'yyyy-MM-dd')

scala> val enriched_df = df.withColumn("ts", ts).show(2, false)
+-----+-----+-----------+-------+----------+-------+----------+-------------------+
|empno|ename|designation|manager|hire_date |sal    |deptno    |ts                 |
+-----+-----+-----------+-------+----------+-------+----------+-------------------+
|7369 |SMITH|CLERK      |9902   |2010-12-17|800.00 |20        |2010-12-17 00:00:00|
|7499 |ALLEN|SALESMAN   |9698   |2011-02-20|1600.00|30        |2011-02-20 00:00:00|
+-----+-----+-----------+-------+----------+-------+----------+-------------------+

enriched_df: Unit = ()
Jayadeep Jayaraman
  • 2,747
  • 3
  • 15
  • 26
  • Thanks for your response, really appreciate it. as i did mention (i know i can cast string to date later in code but if the above problem solution is possible). the data file is basically a .dat format and the delimiter is pipe. – Chand Mohammad Nov 19 '19 at 18:06
  • The file need not be CSV, it can be any text file. We are using the csv parser to process it, so the above code should work with .DAT files as well. If this answer was helpful do accept the answer. – Jayadeep Jayaraman Nov 19 '19 at 18:15
  • Many thanks for clearing up my head, Now i totally understand the csv wrapper. This is working great.Thanks again. – Chand Mohammad Nov 19 '19 at 18:40
  • Good to know it worked, please do accept the answer. – Jayadeep Jayaraman Nov 19 '19 at 18:41
  • By using CSV parser over the txt or dat file, i can inferSchema and can attach custom schema as well. – Chand Mohammad Nov 19 '19 at 18:42
0

There are multiple ways to cast your data to proper data types.

First : use InferSchema

val df = spark.read .option("delimiter", "\\|").option("header", true) .option("inferSchema", "true").csv(path)
df.printSchema

Some time it doesn't work as expected. see details here

Second : provide your own Datatype conversion template

val rawDF = Seq(("7369", "SMITH" , "2010-12-17", "800.00"), ("7499", "ALLEN","2011-02-20", "1600.00")).toDF("empno", "ename","hire_date", "sal")
//define schema in DF , hire_date as Date
val schemaDF = Seq(("empno", "INT"), ("ename", "STRING"),  (**"hire_date", "date"**) , ("sal", "double")).toDF("columnName", "columnType")
rawDF.printSchema

enter image description here

    //fetch schema details
    val dataTypes = schemaDF.select("columnName", "columnType")
    val listOfElements = dataTypes.collect.map(_.toSeq.toList)
    //creating a map friendly template
    val validationTemplate = (c: Any, t: Any) => {
       val column = c.asInstanceOf[String]
       val typ = t.asInstanceOf[String]
       col(column).cast(typ)
      }

     //Apply datatype conversion template on rawDF 
    val convertedDF = rawDF.select(listOfElements.map(element => validationTemplate(element(0), element(1))): _*)
    println("Conversion done!")
    convertedDF.show()
    convertedDF.printSchema

enter image description here

Third : Case Class

Create schema from caseclass with ScalaReflection and provide this customized schema while loading DF.

  import org.apache.spark.sql.catalyst.ScalaReflection
  import org.apache.spark.sql.types._

  case class MySchema(empno: int, ename: String, hire_date: Date, sal: Double)

  val schema = ScalaReflection.schemaFor[MySchema].dataType.asInstanceOf[StructType]

  val rawDF = spark.read.schema(schema).option("header", "true").load(path)
  rawDF.printSchema

Hope this will help.

Ehsan Ullah Nazir
  • 1,827
  • 1
  • 14
  • 20
  • Thanks a lot for your detailed answer. please find my queries/responses below: – Chand Mohammad Nov 19 '19 at 18:20
  • Thanks for your detailed answer. please find my responses below: Soln1 - The file is a .dat file which can be read using textFile api, so no point of inferSchema. I don't think i can apply a schema like that. In case of case clas, if you see the problem, i have actually done that however i am stuck when the output of my split field (string) is mapped with Case class parameter(Date). I see one solution - process rdd date field as string, convert the rdd to dataframe and then cast the column in the sql queries or spark functional sql. (**"hire_date", "date"**) asterisks here for ? – Chand Mohammad Nov 19 '19 at 18:30
  • Hey Ehsan, I may not use the inferSchema in case my dataset is huge, I would like to keep my transformations lazy. (inferschema reads all the data from disk) I was under the impression that only CSV file can be parsed using the csv wrapper. Thanks for your response, I have learned few dynamic concepts from your response. – Chand Mohammad Nov 19 '19 at 18:47