I want to transform a Dataframe that is read in as Json to a Dataset of a given class. So far, that worked pretty well, when I was able to write own case classes.
case class MyCaseClass(...)
val df = spark.read.json("path/to/json")
val ds = df.as[MyCaseClass]
def myFunction(input: MyCaseClass): MyCaseClass = {
// Do some validation and things
input
}
ds.map(myFunction)
However, now I am bound to external Java classes (specifically ones created by thrift). So here a more concrete example with a custom class:
Json:
{"a":1,"b":"1","wrapper":{"inside":"1.1", "map": {"k": "v"}}}
{"a":2,"b":"2","wrapper":{"inside":"2.1", "map": {"k": "v"}}}
{"a":3,"b":"3","wrapper":{"inside":"3.1", "map": {"k": "v"}}}
Class:
class MyInnerClass(var inside: String, var map: Map[String, String]) extends java.io.Serializable {
def getInside(): String = {inside}
def setInside(newInside: String) {inside = newInside}
def getMap(): Map[String, String] = {map}
def setMap(newMap: Map[String, String]) {map = newMap}
}
class MyClass(var a: Int, var b: String, var wrapper: MyInnerClass) extends java.io.Serializable {
def getA(): Int = {a}
def setA(newA: Int) {a = newA}
def getB(): String = {b}
def setB(newB: String) {b = newB}
def getWrapper(): MyInnerClass = {wrapper}
def setWrapper(newWrapper: MyInnerClass) {wrapper = newWrapper}
}
So I want to do:
val json = spark.read.json("path/to/json")
json.as[MyClass]
However, that throws:
Unable to find encoder for type stored in a Dataset. Primitive type (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.
So, I found out about Custom Encoders: (here and here)
import org.apache.spark.sql.Encoders
val kryoMyClassEncoder = Encoders.kryo[MyClass]
json.as[MyClass](kryoMyClassEncoder)
Which throws:
Try to map struct<a:bigint,b:string,wrapper:struct<inside:string,map:struct<k:string>>> to Tuple1, but failed as the number of fields does not line up
So how can I convert a Dataframe to a custom object Dataset.