1

I'm very new to both spark and scala, and am trying to load a csv similar to:

A,09:33:57.570
B,09:43:02.577
...

The only temporal type I see in scala.sql.types is TimestampType, so I am loading the csv with:

val schema = StructType(Array( StructField("A", StringType, true), StructField("time", TimestampType, true)))

val table = spark.read.option("header","false").option("inferSchema","false").schema(schema).csv("../table.csv")

This seems to work fine until I do table.show() or table.take(5), etc, in which case I get the following exception:

scala> table.show()
16/10/07 16:32:25 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
java.lang.IllegalArgumentException
        at java.sql.Date.valueOf(Date.java:143)
        at org.apache.spark.sql.catalyst.util.DateTimeUtils$.stringToTime(DateTimeUtils.scala:137)
        at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$.castTo(CSVInferSchema.scala:287)
        at org.apache.spark.sql.execution.datasources.csv.CSVRelation$$anonfun$csvParser$3.apply(CSVRelation.scala:115)
        at org.apache.spark.sql.execution.datasources.csv.CSVRelation$$anonfun$csvParser$3.apply(CSVRelation.scala:84)
        at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$buildReader$1$$anonfun$apply$1.apply(CSVFileFormat.scala:125)
        at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$buildReader$1$$anonfun$apply$1.apply(CSVFileFormat.scala:124)
        at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)

Is there a preferred way of having time data stored within spark? I have also tried leaving it as a string and mapping LocalTime.parse() from java.time on each value, but that fails saying that there is no Encoder for the type.

Tim
  • 15
  • 2

2 Answers2

0

There is no SQL type which can directly accommodate time data so probably the best you can do is to use LongType by parsing with unix_timestamp. Read data with

 StructField("time", StringType, true)))

It should result with a data frame similar to:

val df = Seq(("A", "09:33:57.570"), ("B", "09:43:02.577")).toDF("A", "time")

define a simple date format:

val format = "HH:mm:ss.SSS"

and use it for parsing:

df.withColumn("seconds", unix_timestamp($"time", format))

Unfortunately this is a lossy transformation.

+---+------------+-------+
|  A|        time|seconds|
+---+------------+-------+
|  A|09:33:57.570|  30837|
|  B|09:43:02.577|  31382|
+---+------------+-------+

so if you want to preserve milliseconds you can use java.time.LocalTime as you do and store the result of toNanoOfDay.

val nanoOfDay = udf((s: String) => 
  java.time.LocalTime.parse(s).toNanoOfDay)

df.withColumn("nanseconds", nanoOfDay($"time"))
zero323
  • 322,348
  • 103
  • 959
  • 935
0

You might also want to look at JodaTime for date/time manipulation. You can include this in your pom.xml (for Maven)

    <dependency>
        <groupId>joda-time</groupId>
        <artifactId>joda-time</artifactId>
        <version>2.9</version>
    </dependency>

    <dependency>
        <groupId>org.joda</groupId>
        <artifactId>joda-convert</artifactId>
        <version>1.8.1</version>
    </dependency>
user3803714
  • 5,269
  • 10
  • 42
  • 61