You can get UDTs to work with Spark using UDTRegistration but you have to use a private API to do it which may not be supported in the future. Use this approach with great caution and only when absolutely necessary. For some use-cases, unfortunately, there is no other option.
Say you want to use a Polymorphic Record:
trait CustomPoly
case class FooPoly(id:Int) extends CustomPoly
case class BarPoly(value:String, secondValue:Long) extends CustomPoly
polySeq.filter(_.poly match {
case FooPoly(value) => value == 1
case _ => false
}).show()
You can write a custom UDT that encodes everything to bytes (I'm using java serialization here but it's probably better to instrument Spark's Kryo context).
First define the UDT class:
class CustomPolyUDT extends UserDefinedType[CustomPoly] {
val kryo = new Kryo()
override def sqlType: DataType = org.apache.spark.sql.types.BinaryType
override def serialize(obj: CustomPoly): Any = {
val bos = new ByteArrayOutputStream()
val oos = new ObjectOutputStream(bos)
oos.writeObject(obj)
bos.toByteArray
}
override def deserialize(datum: Any): CustomPoly = {
val bis = new ByteArrayInputStream(datum.asInstanceOf[Array[Byte]])
val ois = new ObjectInputStream(bis)
val obj = ois.readObject()
obj.asInstanceOf[CustomPoly]
}
override def userClass: Class[CustomPoly] = classOf[CustomPoly]
}
Then register it:
// NOTE: The file you do this in has to be inside of the org.apache.spark package!
UDTRegistration.register(classOf[CustomPoly].getName, classOf[CustomPolyUDT].getName)
Then you can use it!
// As shown above:
case class UsingPoly(id:Int, poly:CustomPoly)
Seq(
UsingPoly(1, new FooPoly(1)),
UsingPoly(2, new BarPoly("Blah", 123)),
UsingPoly(3, new FooPoly(1))
).toDS
polySeq.filter(_.poly match {
case FooPoly(value) => value == 1
case _ => false
}).show()
Check out my original post here, it has an additional example:
How to store custom objects in Dataset?
Edit: This post was down-voted for understandable reasons. I included a caveat-emptor at the top hopefully to prevent misunderstandings.