4

I would like to read a .csv file with Spark and associate the columns with fitting Types.

    val conf = new SparkConf()
        .setMaster("local[8]")
        .setAppName("Name")

    val sc = new SparkContext(conf)

    val sqlContext = new SQLContext(sc)

    val customSchema = StructType(Array(
        StructField("date", DateType, true),
        StructField("time",StringType, true),
        StructField("am", DoubleType, true),
        StructField("hum", DoubleType, true),
        StructField("temp", DoubleType, true)
    ))

    val df = sqlContext.read
            .format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat")
            .option("header","true")
            .option("delimiter",";")
            .schema(customSchema)
            .load("data.csv")

A line of the .csv I am reading looks like this

+----------+--------+-----+-----+-----+
|      date|    time|   am|  hum| temp|
+----------+--------+-----+-----+-----+
|04.10.2016|12:51:20|1.121|0.149|0.462|
+----------+--------+-----+-----+-----+

Spark will read the .csv and associate the Types correctly if I set the type for the date to String. If I keep the customSchema like in the code shown above, Spark will throw an exception due to the wrong date format (DateType will expect YYYY-MM-DD while mine is DD.MM.YYYY).

Is there a way to re-format the date Strings to YYYY-MM-DD and apply the schema afterwards? Or can I also alter the DateType given by Spark by adding parameters?

Thanks in advance

M-Tier
  • 111
  • 2
  • 9

2 Answers2

5

Use dateFormat option:

val df = sqlContext.read
  .format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat")
  .option("header","true")
  .option("delimiter",";")
  .option("dateFormat", "dd.MM.yyyy")
  .schema(customSchema)
  .load("data.csv")
  • Thanks, that worked. Can you tell me if there is a list of available options for the DataFrameReader? I could not find one yet. – M-Tier Nov 28 '16 at 15:04
1

I recommend parsing the dates afterwards. Also refer to this.

val df = Seq((1L, "05/26/2016 01:01:01"), (2L, "#$@#@#")).toDF("id", "dts")
import org.apache.spark.sql.functions.unix_timestamp

val ts = unix_timestamp($"dts", "MM/dd/yyyy HH:mm:ss").cast("timestamp")

df.withColumn("ts", ts).show(2, false)
// +---+-------------------+---------------------+
// |id |dts                |ts                   |
// +---+-------------------+---------------------+
// |1  |05/26/2016 01:01:01|2016-05-26 01:01:01.0|
// |2  |#$@#@#             |null                 |
// +---+-------------------+---------------------+

and:

scala> date.format(DateTimeFormatter.ofPattern("yyyy.MM.dd"))
res0: String = 2020.01.01

Also as a side note, since spark 2.0, you use spark session object only and use encoders for inferring schema(instead of sc, sqlcontext etc). Something like this:

spark = SparkSession(...)
case class User(id:Int, city:String, loc:Array[Double], pop:Long, state:String)
val users = (spark.read.option("inferSchema","true").option("header","true").csv("data/users1.csv").as[User])
Community
  • 1
  • 1
jimseeve
  • 141
  • 1
  • 7