12

I want to transform a Dataframe that is read in as Json to a Dataset of a given class. So far, that worked pretty well, when I was able to write own case classes.

case class MyCaseClass(...)
val df = spark.read.json("path/to/json")
val ds = df.as[MyCaseClass]

def myFunction(input: MyCaseClass): MyCaseClass = {
    // Do some validation and things
    input
}

ds.map(myFunction)

However, now I am bound to external Java classes (specifically ones created by thrift). So here a more concrete example with a custom class:

Json:

{"a":1,"b":"1","wrapper":{"inside":"1.1", "map": {"k": "v"}}}
{"a":2,"b":"2","wrapper":{"inside":"2.1", "map": {"k": "v"}}}
{"a":3,"b":"3","wrapper":{"inside":"3.1", "map": {"k": "v"}}}

Class:

class MyInnerClass(var inside: String, var map: Map[String, String]) extends java.io.Serializable {
  def getInside(): String = {inside}
  def setInside(newInside: String) {inside = newInside}
  def getMap(): Map[String, String] = {map}
  def setMap(newMap: Map[String, String]) {map = newMap}
}

class MyClass(var a: Int, var b: String, var wrapper: MyInnerClass)  extends java.io.Serializable {
  def getA(): Int = {a}
  def setA(newA: Int) {a = newA}
  def getB(): String = {b}
  def setB(newB: String) {b = newB}
  def getWrapper(): MyInnerClass = {wrapper}
  def setWrapper(newWrapper: MyInnerClass) {wrapper = newWrapper}
}

So I want to do:

val json = spark.read.json("path/to/json")
json.as[MyClass]

However, that throws:

Unable to find encoder for type stored in a Dataset.  Primitive type (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.

So, I found out about Custom Encoders: (here and here)

import org.apache.spark.sql.Encoders
val kryoMyClassEncoder  = Encoders.kryo[MyClass]
json.as[MyClass](kryoMyClassEncoder)

Which throws:

Try to map struct<a:bigint,b:string,wrapper:struct<inside:string,map:struct<k:string>>> to Tuple1, but failed as the number of fields does not line up

So how can I convert a Dataframe to a custom object Dataset.

Community
  • 1
  • 1
Dominik Müller
  • 575
  • 1
  • 9
  • 16
  • 1
    Any luck on that? Could you please share how you got it working? – Chaitanya Chandurkar Aug 30 '17 at 21:23
  • Same problem here. I have a workaround using Java classes (and enums), but I would love to see it working with Scala classes and enums as well. – jurgispods Sep 27 '17 at 13:06
  • 1
    I faced the same issue in a similar context. In my case, I had the case class created in the same method where I was reading the JSON. I moved the case class definition to an outer scope and it curiously worked. Serendipitously I stumbled upon this solution while browsing for [another](https://stackoverflow.com/questions/33704831/value-todf-is-not-a-member-of-org-apache-spark-rdd-rdd) error. – Felipe Martins Melo Nov 29 '17 at 23:34

3 Answers3

5

Instead of using kryo encoder, try using the product encoder, ie:

val productMyClassEncoder  = Encoders.product[MyClass]
Dan Osipov
  • 1,429
  • 12
  • 15
  • 1
    Worked for me only when I both used the product encoder and moved the case class definition into an outer scope – Egor Kraev Sep 10 '18 at 11:34
1

Same problem i had(nothing helped) when used declaration of case class inside method. After moving the class outside the method import spark.implicits._ worked correctly

S.Daineko
  • 1,790
  • 1
  • 20
  • 29
1

While reading the data as json, we need to change the schema to a single field with type Binary and column name as "value" when using kryo serialization

val json = spark.read.json("path/to/json").schema(newStructType().add("value",BinaryType))
StrikerVillain
  • 3,719
  • 2
  • 24
  • 41
Thejesh
  • 11
  • 1