6

In Spark 2.0, the one example I've found of creating a UDT in Scala seems to no longer be applicable. The UserDefinedType class has been set as private, with the comment:

Note: This was previously a developer API in Spark 1.x. We are making this private in Spark 2.0 because we will very likely create a new version of this that works better with Datasets.

It might be the intent of UDTRegistration to be the new mechanism of declaring UDT, but it's also private.

So far, my research tells me that there is no way to declare your own UDTs in Spark 2.0; is this conclusion correct?

Anders Olsson
  • 211
  • 3
  • 10
  • 5
    It is correct. As for Spark 2.0 former UDT mechanism has been removed due to compatibility issues and as for now there is no replacement. – zero323 Aug 23 '16 at 11:50

2 Answers2

4

You can get UDTs to work with Spark using UDTRegistration but you have to use a private API to do it which may not be supported in the future. Use this approach with great caution and only when absolutely necessary. For some use-cases, unfortunately, there is no other option.

Say you want to use a Polymorphic Record:

trait CustomPoly
case class FooPoly(id:Int) extends CustomPoly
case class BarPoly(value:String, secondValue:Long) extends CustomPoly

polySeq.filter(_.poly match {
  case FooPoly(value) => value == 1
  case _ => false
}).show()

You can write a custom UDT that encodes everything to bytes (I'm using java serialization here but it's probably better to instrument Spark's Kryo context).

First define the UDT class:

class CustomPolyUDT extends UserDefinedType[CustomPoly] {
  val kryo = new Kryo()

  override def sqlType: DataType = org.apache.spark.sql.types.BinaryType
  override def serialize(obj: CustomPoly): Any = {
    val bos = new ByteArrayOutputStream()
    val oos = new ObjectOutputStream(bos)
    oos.writeObject(obj)

    bos.toByteArray
  }
  override def deserialize(datum: Any): CustomPoly = {
    val bis = new ByteArrayInputStream(datum.asInstanceOf[Array[Byte]])
    val ois = new ObjectInputStream(bis)
    val obj = ois.readObject()
    obj.asInstanceOf[CustomPoly]
  }

  override def userClass: Class[CustomPoly] = classOf[CustomPoly]
}

Then register it:

// NOTE: The file you do this in has to be inside of the org.apache.spark package!
UDTRegistration.register(classOf[CustomPoly].getName, classOf[CustomPolyUDT].getName)

Then you can use it!

// As shown above:
case class UsingPoly(id:Int, poly:CustomPoly)

Seq(
  UsingPoly(1, new FooPoly(1)),
  UsingPoly(2, new BarPoly("Blah", 123)),
  UsingPoly(3, new FooPoly(1))
).toDS

polySeq.filter(_.poly match {
  case FooPoly(value) => value == 1
  case _ => false
}).show()

Check out my original post here, it has an additional example: How to store custom objects in Dataset?

Edit: This post was down-voted for understandable reasons. I included a caveat-emptor at the top hopefully to prevent misunderstandings.

  • As said by the OP, "The UserDefinedType class has been set as private", so this does not work in Spark 2.0. – unautre May 15 '19 at 14:02
2

Well you are right for now, the Spark 2.x has no more any kind of UDT to use as an API that was like in Spark 1.x.

You can see here in this ticket SPARK-14155 that they make it private to create a new API. That we have a ticket open since Spark 1.5 that we wish that will be closed in Spark 2.2 SPARK-7768.

Well, types are not good for now to create your UDT but... There few tricks that you can set your custom objects to a DataSet. Here is one example.

nefo_x
  • 3,050
  • 4
  • 27
  • 40
Thiago Baldim
  • 7,362
  • 3
  • 29
  • 51