5

I have a problem with spark already outlined in spark custom kryo encoder not providing schema for UDF but created a minimal sample now: https://gist.github.com/geoHeil/dc9cfb8eca5c06fca01fc9fc03431b2f

class SomeOtherClass(foo: Int)
case class FooWithSomeOtherClass(a: Int, b: String, bar: SomeOtherClass)
case class FooWithoutOtherClass(a: Int, b: String, bar: Int)
case class Foo(a: Int)
implicit val someOtherClassEncoder: Encoder[SomeOtherClass] = Encoders.kryo[SomeOtherClass]
val df2 = Seq(FooWithSomeOtherClass(1, "one", new SomeOtherClass(4))).toDS
val df3 = Seq(FooWithoutOtherClass(1, "one", 1), FooWithoutOtherClass(2, "two", 2)).toDS
val df4 = df3.map(d => FooWithSomeOtherClass(d.a, d.b, new SomeOtherClass(d.bar)))

here, even the createDataSet statement fails due to

java.lang.UnsupportedOperationException: No Encoder found for SomeOtherClass
- field (class: "SomeOtherClass", name: "bar")
- root class: "FooWithSomeOtherClass"

Why is the encoder not in scope or at least not in the right scope?

Also, trying to specify an explicit encoder like:

df3.map(d => {FooWithSomeOtherClass(d.a, d.b, new SomeOtherClass(d.bar))}, (Int, String, Encoders.kryo[SomeOtherClass]))

does not work.

Georg Heiler
  • 16,916
  • 36
  • 162
  • 292
  • 1
    If you want more than just a binary blob, take a look at [this answer](https://stackoverflow.com/q/36648128/3072788). – Alec Jul 10 '17 at 16:02

1 Answers1

5

This happens because you should use the Kryo encoder through the whole serialization stack, meaning that your top-level object should have a Kryo encoder. The following runs successfully on a local Spark shell (the change you are interested in is on the first line):

  implicit val topLevelObjectEncoder: Encoder[FooWithSomeOtherClass] = Encoders.kryo[FooWithSomeOtherClass]

  val df1 = Seq(Foo(1), Foo(2)).toDF

  val df2 = Seq(FooWithSomeOtherClass(1, "one", new SomeOtherClass(4))).toDS

  val df3 = Seq(FooWithoutOtherClass(1, "one", 1), FooWithoutOtherClass(2, "two", 2)).toDS
  df3.printSchema
  df3.show

  val df4 = df3.map(d => FooWithSomeOtherClass(d.a, d.b, new SomeOtherClass(d.bar)))
  df4.printSchema
  df4.show
  df4.collect
stefanobaghino
  • 11,253
  • 4
  • 35
  • 63
  • 3
    I see. But this transforms the whole schema to a single binary column. I would rather have multiple (original column) where a single one is serialized via kayo/binary. Is this possible as well? I have seen some java projects explicitly serializing like `Encoders.tuple(Encoders.STRING(), Encoders.kryo(Mything.class)))` which seems to provide the desired behaviour. However, I do not yet know how to reproduce this in the Scala api. – Georg Heiler Jul 10 '17 at 15:30
  • I guess that you are unable to make this a `case class`, either directly or by copying it's information, right? – stefanobaghino Jul 10 '17 at 15:42
  • 2
    It would be possible at least for parts of the desired functionality. However, when looking at it regarding usability, it would be great if I could dimply apply kryo to the one single filed which itself is not a product. – Georg Heiler Jul 10 '17 at 15:45