4

I'm writing a Spark application using version 2.1.1. The following code got the error when calling a method with LocalDate parameter?

Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for java.time.LocalDate
- field (class: "java.time.LocalDate", name: "_2")
- root class: "scala.Tuple2"
        at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:602)
        at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:596)
        at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:587)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
        at scala.collection.immutable.List.flatMap(List.scala:344)
        at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:587)
....
val date : LocalDate = ....
val conf = new SparkConf()
val sc = new SparkContext(conf.setAppName("Test").setMaster("local[*]"))
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val itemListJob = new ItemList(sqlContext, jdbcSqlConn)
import sqlContext.implicits._ 
val processed = itemListJob.run(rc, priority).select("id").map(d => {
  runJob.run(d, date) 
})

class ItemList(sqlContext: org.apache.spark.sql.SQLContext, jdbcSqlConn: String) {
  def run(date: LocalDate) = {
    import sqlContext.implicits._ 
    sqlContext.read.format("jdbc").options(Map(
      "driver" -> "com.microsoft.sqlserver.jdbc.SQLServerDriver",
      "url" -> jdbcSqlConn,
      "dbtable" -> s"dbo.GetList('$date')"
    )).load()
    .select("id") 
    .as[Int] 
  }
}

Update: I changed the return type of runJob.run() to tuple (int, java.sql.Date) and changed the code in the lambda of .map(...) to

val processed = itemListJob.run(rc, priority).select("id").map(d => {
  val (a,b) = runJob.run(d, date) 
  $"$a, $b"
})

Now the error changed to

[error] C:\....\scala\main.scala:40: Unable to find encoder for type stored in a Dataset.  Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases. 
[error]     val processed = itemListJob.run(rc, priority).map(d => { 
[error]                                                      ^ 
[error] one error found 
[error] (compile:compileIncremental) Compilation failed
zero323
  • 322,348
  • 103
  • 959
  • 935
ca9163d9
  • 27,283
  • 64
  • 210
  • 413
  • Please add spark version and what serialization is used (if you change default). – Zernike May 24 '17 at 21:16
  • My spark version is 2.1.1 on my local development PC. I didn't change anything about serialization (default setup). – ca9163d9 May 24 '17 at 21:19
  • Change `runJob.run(d, date)` to return some class that Spark SQL understands, such as `java.util.Date`. – zsxwing May 24 '17 at 21:31
  • @zsxwing Thanks, I changed the code as you suggested. However, it got the new error now. I tried to add `import sqlContext.implicits._` in the lambda passed to the `map()` function but it didn't help. – ca9163d9 May 24 '17 at 21:58
  • The `import` statement should not be added inside the lambda because it will be used by `map`. Just add it above this line `val processed = itemListJob.run(rc, priority).map(d => {`. – zsxwing May 24 '17 at 22:08
  • Yes, it already had `import sqlContext.implicits._` right before the line `val processed = itemListJob.run(rc, priority).map(d => {`. But it still gets the error? – ca9163d9 May 24 '17 at 22:09
  • @zsxwing, never mind. The error is caused by I wrote `$"..."` instead of `s"...."`. – ca9163d9 May 24 '17 at 22:20
  • @zsxwing, the code runs successfully now. Now I have a question about the parallelism. The question is https://stackoverflow.com/questions/44169588/parallelism-rdd-parallelize-vs-dataset-map. Can you comment on it? Thanks a lot! – ca9163d9 May 24 '17 at 22:40

1 Answers1

1

for custom dataset type, you can use Kyro serde framework, as long as your data is actually serializable(aka. implements Serializable). here is one example of using Kyro: Spark No Encoder found for java.io.Serializable in Map[String, java.io.Serializable].

Kyro is always recommended since it's much faster and also compatible with Java serde framework. you can definitely choose Java native serde(ObjectWriter/ObjectReader) but it's much slower.

like the comments above, SparkSQL comes with lots of useful Encoders under sqlContext.implicits._, but that won't cover everything, so you might have to plugin your own Encoder.

Like I said, your custom data has to be serializable, and according to https://docs.oracle.com/javase/8/docs/api/java/time/LocalDate.html, it implements Serializable interface, so you are definitely good here.

linehrr
  • 1,668
  • 19
  • 24