I have an algebraic data type that I want to use as a parameter for a case class which looks like this:
sealed abstract class DayOfWeek(val id: String)
object DayOfWeek {
final object Sunday extends DayOfWeek("sunday")
final object Monday extends DayOfWeek("monday")
final object Tuesday extends DayOfWeek("tuesday")
final object Wednesday extends DayOfWeek("wednesday")
final object Thursday extends DayOfWeek("thursday")
final object Friday extends DayOfWeek("friday")
final object Saturday extends DayOfWeek("saturday")
val members: List[DayOfWeek] = List(Sunday, Monday, Tuesday, Wednesday, Thursday, Friday, Saturday)
def apply(id: String): DayOfWeek = members
.map(member => (member.id, member))
.toMap
.apply(id)
}
I have seen answers here which says there is no good way to do it, like this one. But I don't believe that is the case.
There seems to be a path using UserDefinedType
and UDTRegistration.register
. Those are marked private as of Spark 2.X, but I tried using them in the org.apache.spark
namespace. That handles the private issue. But when I tried to call .toDS
on a Seq[DayOfWeek]
, even after I call the register, it still says value toDS is not a member of Seq[DayOfWeek]
. So it's not picking up that registration.
package org.apache.spark
object DayOfWeekUDT {
def register(): Unit = UDTRegistration.register(classOf[DayOfWeek].getName, classOf[DayOfWeekUDT].getName)
}
class DayOfWeekUDT extends UserDefinedType[DayOfWeek] {
override def sqlType: DataType = StringType
override def serialize(obj: DayOfWeek): Any = org.apache.spark.unsafe.types.UTF8String.fromString(obj.id)
override def deserialize(datum: Any): DayOfWeek = DayOfWeek(datum.toString)
override def userClass: Class[DayOfWeek] = classOf[DayOfWeek]
}
There is also the creating an implicit val of type Encoder[DayOfWeek
using ExpressionEncoder
. I searched all of Github for examples. The only few I could find didn't apply to my specific need. And I couldn't understand them well enough to make my own version and get it to work. This SHOULD work with Spark 2.x (in my case 2.4.x). It's just a matter of figuring out how to use this tool. This is what I tried and had inside the DayOfWeek
companion object:
private val clazz: Class[DayOfWeek] = classOf[DayOfWeek]
private val inputObject: BoundReference = BoundReference(0, ObjectType(clazz), false)
private val converter = StaticInvoke(
classOf[UTF8String],
StringType,
"fromString",
Invoke(inputObject, "id", ObjectType(classOf[String])) :: Nil
)
private val serializer: Seq[Expression] = CreateNamedStruct(Literal("day_of_week") :: converter :: Nil).flatten
private val deserializer: Expression = StaticInvoke(
staticObject = DayOfWeek.getClass,
dataType = ObjectType(clazz),
functionName = "apply",
arguments = Invoke(
targetObject = UpCast(
child = GetColumnByOrdinal(0, StringType),
dataType = StringType,
walkedTypePath = "- root class: DayOfWeek" :: Nil
),
functionName = "id",
dataType = ObjectType(classOf[String])
) :: Nil,
propagateNull = false,
returnNullable = false
)
implicit val encoder: Encoder[DayOfWeek] = new ExpressionEncoder[DayOfWeek](
schema = StructType(Seq(StructField("id", StringType, false))),
flat = true,
serializer = serializer,
deserializer = deserializer,
clsTag = ClassTag(classOf[DayOfWeek])
)
Does anyone have any idea how to do this properly? I like the UserDefinedType
concept, in that the conversion back and forth with data type in Spark and the ADT is very clear. The ExpressionEncoder
almost looks like it was written to be a cryptic as possible.