17
  • Spark 2.1.1
  • Scala 2.11.8
  • Java 8
  • Linux Ubuntu 16.04 LTS

I'd like to transform my RDD into a Dataset. For this, I use the implicits method toDS() that give me the following error:

Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for java.time.LocalDate
- field (class: "java.time.LocalDate", name: "date")
- root class: "observatory.TemperatureRow"
    at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:602)
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:596)
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:587)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
    at scala.collection.immutable.List.flatMap(List.scala:344)

In my case, I must use the type java.time.LocalDate, I can't use the java.sql.data. I have read that I need to informe Spark how transforme Java type into Sql type, I this direction, I build the 2 implicits functions below:

implicit def toSerialized(t: TemperatureRow): EncodedTemperatureRow = EncodedTemperatureRow(t.date.toString, t.location, t.temperature)
implicit def fromSerialized(t: EncodedTemperatureRow): TemperatureRow = TemperatureRow(LocalDate.parse(t.date), t.location, t.temperature)

Below, some code about my application:

case class Location(lat: Double, lon: Double)

case class TemperatureRow(
                             date: LocalDate,
                             location: Location,
                             temperature: Double
                         )

case class EncodedTemperatureRow(
                             date: String,
                             location: Location,
                             temperature: Double

val s = Seq[TemperatureRow](
                    TemperatureRow(LocalDate.parse("2017-01-01"), Location(1.4,5.1), 4.9),
                    TemperatureRow(LocalDate.parse("2014-04-05"), Location(1.5,2.5), 5.5)
                )

import spark.implicits._
val temps: RDD[TemperatureRow] = sc.parallelize(s)
val tempsDS = temps.toDS

I don't know why Spark search an encoder for java.time.LocalDate, I provide implicit conversions for TemperatureRow and EncodedTemperatureRow...

Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
JimyRyan
  • 359
  • 1
  • 2
  • 17
  • 1
    The implicit conversions you've supplied are simply useless for Spark - how would the framework "know" to convert objects into `EncodedTemperatureRow`? Spark requires implicit values with type `org.apache.spark.sql.Encoder[T]` to encode values of type `T`, so you'd need to supply an impllicit `Encoder[TemperatureRow]`. Creating such encoders is not trivial, see https://stackoverflow.com/a/39442829/5344058 – Tzach Zohar Jul 19 '17 at 14:33

1 Answers1

16

java.time.LocalDate is not supported up to Spark 2.2 (and I've been trying to write an Encoder for the type for some time and failed).

You have to convert java.time.LocalDate to some other supported type (e.g. java.sql.Timestamp or java.sql.Date), or epoch or date-time in string.

Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
  • 3
    And to clarify for anyone landing here: not even version 2.2 of Spark handles the JDK8 date/time classes. The conversion in https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala only look at the pre-JDK8 date/time classes. – wishihadabettername Oct 03 '17 at 19:15
  • so how do you do that? i.e. code plz – CpILL Mar 07 '19 at 05:49
  • You simply `map` or `withColumn` to some other type to avoid this issue. – Jacek Laskowski Mar 07 '19 at 10:34
  • 2
    It seems like Spark 3 will support it. See https://issues.apache.org/jira/browse/SPARK-27222 and https://github.com/apache/spark/commit/0f4f8160e6d01d2e263adcf39d53bd0a03fc1b73#diff-f52e4a77ff9291d86359d609a9757781 – dvir Jul 30 '19 at 18:10
  • Any comments on why [it](https://jaceklaskowski.gitbooks.io/mastering-spark-sql/content/spark-sql-ExpressionEncoder-LocalDateTime.html) failed? – jack Oct 03 '20 at 09:52