0

Good evening.

I am doing some comparative work on the performance of RDDs, Dataframes and Datasets in Spark 2.1.0 (using built-in Scala 2.11.8). I have downloaded some freely available data from https://data.london.gov.uk/dataset/smartmeter-energy-use-data-in-london-households and executed the script later on shown on it. To give you a preview, the interrogated data looks as follows:

LCLid,stdorToU,DateTime,KWH/hh (per half hour) ,Acorn,Acorn_grouped
MAC000002,Std,2012-10-12 00:30:00.0000000, 0 ,ACORN-A,Affluent
MAC000002,Std,2012-10-12 01:00:00.0000000, 0 ,ACORN-A,Affluent
MAC000002,Std,2012-10-12 01:30:00.0000000, 0 ,ACORN-A,Affluent
MAC000002,Std,2012-10-12 02:00:00.0000000, 0 ,ACORN-A,Affluent
MAC000002,Std,2012-10-12 02:30:00.0000000, 0 ,ACORN-A,Affluent
MAC000002,Std,2012-10-12 03:00:00.0000000, 0 ,ACORN-A,Affluent
MAC000002,Std,2012-10-12 03:30:00.0000000, 0 ,ACORN-A,Affluent
MAC000002,Std,2012-10-12 04:00:00.0000000, 0 ,ACORN-A,Affluent

To achieve my comparative work, I time Spark at different stages of the import and transformation [String, String, Timestamp, Double, String, String] of the 6 variables expressed above. I have successfully managed to map the data into a Dataframe and a Dataset but cannot quite achieve the same in terms of RDD. Everytime I try to convert the file into an RDD, I get the following error:

ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 3)
java.lang.IllegalArgumentException: Timestamp format must be yyyy-mm-dd hh:mm:ss[.fffffffff]

I am very confused since the variable 'DateTime' is already expressed as a timestamp format of 'yyyy-mm-dd hh:mm:ss[.fffffffff]'. I have read posts such as these (Convert Date to Timestamp in Scala, How to convert unix timestamp to date in Spark, Spark SQL: parse timestamp without seconds) but do not satisfy my needs.

It's even more confusing as the defined class 'londonDataSchemaDS' I constructed works on my Dataset conversion but not on my RDD one.

This is the script I have used:

import java.io.File
import java.sql.Timestamp

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}

val sparkSession = SparkSession.builder.appName("SmartData London").master("local[*]").getOrCreate()

val LCLid = StructField("LCLid", DataTypes.StringType)
val stdorToU = StructField("stdorToU", DataTypes.StringType)
val DateTime = StructField("DateTime", DataTypes.TimestampType)
val KWHhh = StructField("KWH/hh (per half hour) ", DataTypes.DoubleType)
val Acorn = StructField("Acorn", DataTypes.StringType)
val Acorn_grouped = StructField("Acorn_grouped", DataTypes.StringType)

val fields = Array(LCLid,stdorToU,DateTime,KWHhh,Acorn,Acorn_grouped)
val londonDataSchemaDF = StructType(fields)

import sparkSession.implicits._

case class londonDataSchemaDS(LCLid: String, stdorToU: String, DateTime: java.sql.Timestamp, KWHhh: Double, Acorn: String, Acorn_grouped: String)

val t0 = System.nanoTime()

val loadFileRDD=sparkSession.sparkContext.textFile("C:/Data/Smart_Data_London/Power-Networks-LCL-June2015(withAcornGps).csv_Pieces/Power-Networks-LCL-June2015(withAcornGps)v2_1.csv")
.map(_.split(","))
.map(r=>londonDataSchemaDS(r(0), r(1), Timestamp.valueOf(r(2)), r(3).toDouble, r(4), r(5)))

val t1 = System.nanoTime()

val loadFileDF=sparkSession.read.schema(londonDataSchemaDF).option("header", true)
.csv("C:/Data/Smart_Data_London/Power-Networks-LCL-June2015(withAcornGps).csv_Pieces/Power-Networks-LCL-June2015(withAcornGps)v2_1.csv")

val t2=System.nanoTime()

val loadFileDS=sparkSession.read.option("header", "true")
.csv("C:/Data/Smart_Data_London/Power-Networks-LCL-June2015(withAcornGps).csv_Pieces/Power-Networks-LCL-June2015(withAcornGps)v2_1.csv")
.withColumn("DateTime", $"DateTime".cast("timestamp"))
.withColumnRenamed("KWH/hh (per half hour) ", "KWHhh")
.withColumn("KWHhh", $"KWHhh".cast("double"))
.as[londonDataSchemaDS]

val t3 = System.nanoTime()

loadFileRDD.take(10)

loadFileDF.show(10, false)
loadFileDF.printSchema()

loadFileDS.show(10, false)
loadFileDS.printSchema()

println("Time Elapsed to implement RDD: " + (t1 - t0) * 1E-9 + " seconds")
println("Time Elapsed to implement DataFrame: " + (t2 - t1) * 1E-9 + " seconds")
println("Time Elapsed to implement Dataset: " + (t3 - t2) * 1E-9 + " seconds")

Any help on this would be most appreciated and/or a nudge in the right direction.

Many thanks,

Christian

Community
  • 1
  • 1
Christian Ivaha
  • 199
  • 1
  • 3
  • 10

1 Answers1

0

I know what I did wrong. I was so caught up in the DataFrame and Dataset conversion which has got a built-in function to skip the header, that I forgot to remove the header from the RDD conversion process.

By adding the lines below, I remove the header and successfully convert my csv to an RDD (This explains why I was getting a formatting error in Timestamp):

val loadFileRDDwH=sparkSession.sparkContext.textFile("C:/Data/Smart_Data_London/Power-Networks-LCL-June2015(withAcornGps).csv_Pieces/Power-Networks-LCL-June2015(withAcornGps)v2_1.csv").map(_.split(","))

val header=loadFileRDDwH.first()

val loadFileRDD=loadFileRDDwH.filter(_(0) != header(0)).map(r=>londonDataSchemaDS(r(0), r(1), Timestamp.valueOf(r(2)), r(3).split("\\s+").mkString.toDouble, r(4), r(5)))

Thanks for reading

Christian

Christian Ivaha
  • 199
  • 1
  • 3
  • 10