2

I want to transform an RDD into a Dataset with custom columns using the Spark SQL native function toDS().

I don't have any errors at compilation time, but at runtime, I got the error No Encoder found for java.time.LocalDate.
Bellow, the full stack trace log:

Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for java.time.LocalDate
- field (class: "java.time.LocalDate", name: "_1")
- root class: "scala.Tuple3"
    at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:602)
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:596)
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:587)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
    at scala.collection.immutable.List.flatMap(List.scala:344)
    at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:587)
    at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor(ScalaReflection.scala:425)
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:71)
    at org.apache.spark.sql.Encoders$.product(Encoders.scala:275)
    at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:49)
    at observatory.Extraction$.locationYearlyAverageRecords(Extraction.scala:114)
    at observatory.Extraction$.processExtraction(Extraction.scala:28)
    at observatory.Main$.delayedEndpoint$observatory$Main$1(Main.scala:18)
    at observatory.Main$delayedInit$body.apply(Main.scala:7)
    at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
    at scala.App$$anonfun$main$1.apply(App.scala:76)
    at scala.App$$anonfun$main$1.apply(App.scala:76)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
    at scala.App$class.main(App.scala:76)
    at observatory.Main$.main(Main.scala:7)
    at observatory.Main.main(Main.scala)

The structure of my RDD is composed of three columns, based on Tuple3 where the signature is:

type TemperatureRecord = (LocalDate, Location, Double)

Field LocalDate is the Java Object coming from package java.time.LocalDate.
Field Location is a custom type made with two Double (GPS coordinates) having this signature:

case class Location(lat: Double, lon: Double)

Below, one sample row:

(1975-01-01, Location(70.933,-8.667), -4.888888888888889)


Some details about my application / environment:

  • Scala: 2.11.8
  • Spark core: 2.1.1
  • Spark SQL: 2.1.1
  • Linux Ubuntu: 16.04 LTS

I have read from this article How to store custom objects in Dataset? that I need to define custom Encoder, but I don't have any idea :(.

JimyRyan
  • 359
  • 1
  • 2
  • 17

1 Answers1

1

The problem is that Spark does not find an encoder for regular classes. As of today Spark only allows to use primitive types for encoders and there is no good support for custom classes.

As for your case, given your "custom" class represents a date you can use java.sql.date instead java.time.LocalDate. The benefit is that you can take advantage of encoders already provided by Spark.

import java.sql.Date
case class TempRow(date: Date, loc: Location, temp: Double)

val ds = Seq(TempRow(java.sql.Date.valueOf("2017-06-01"), 
 Location(1.4,5.1), 4.9), TempRow(java.sql.Date.valueOf("2014-04-05"),
 Location(1.5,2.5), 5.5))
 .toDS

ds.show()

+----------+---------+----+
|      date|      loc|temp|
+----------+---------+----+
|2017-06-01|[1.4,5.1]| 4.9|
|2014-04-05|[1.5,2.5]| 5.5|
+----------+---------+----+

Check the schema:

ds.printSchema()

root
 |-- date: date (nullable = true)
 |-- loc: struct (nullable = true)
 |    |-- i: double (nullable = false)
 |    |-- j: double (nullable = false)
 |-- temp: double (nullable = false)

For more general cases, there is one trick that you can perform to store the majority of custom classes in a Spark dataset. Bear in mind that it does not work for all cases because you need to use a string as an intermediate representation of your custom object. I hope this issue will be solved in the future because it is really a pain.

Find below one solution for your case:

case class Location(val i: Double, val j: Double)
class TempRecord(val date: java.time.LocalDate, val loc: Location, val temp: Double)
type TempSerialized = (String, Location, Double)

implicit def fromSerialized(t: TempSerialized): TempRecord = new TempRecord(java.time.LocalDate.parse(t._1), t._2, t._3)
implicit def toSerialized(t: TempRecord): TempSerialized = (t.date.toString, t.loc, t.temp)

// Finally we can create datasets
val d = spark.createDataset(Seq[TempSerialized](
  new TempRecord(java.time.LocalDate.now, Location(1.0,2.0), 3.0), 
  new TempRecord(java.time.LocalDate.now, Location(5.0,4.0), 4.0) )
).toDF("date", "location", "temperature").as[TempSerialized]

d.show()

+----------+---------+-----------+
|      date| location|temperature|
+----------+---------+-----------+
|2017-07-11|[1.0,2.0]|        3.0|
|2017-07-11|[5.0,4.0]|        4.0|
+----------+---------+-----------+

d.printSchema()

root
 |-- date: string (nullable = true)
 |-- location: struct (nullable = true)
 |    |-- i: double (nullable = false)
 |    |-- j: double (nullable = false)
 |-- temperature: double (nullable = false)
Luis
  • 503
  • 5
  • 11
  • Hello, first, thank you for your answer. I see the date is store as String, did you know how use the custom Encoder to create my custom representation ? – JimyRyan Jul 18 '17 at 12:43
  • As far as I know and correct me if I am wrong, there is no way to store pure custom objects in Spark dataset. Although, they claim that you could store any custom object inside a dataset, the members of these objects must be primitive values that have already an encoder such as integers, string, floats,... – Luis Jul 20 '17 at 10:44