3

Using spark 1.6.0 Say i have a class like this

case class MyClass(date: java.util.Date, oid: org.bson.types.ObjectId)

if i have

//rdd: RDD[MyClass]
rdd.toDF("date", "oid")

i get java.lang.UnsupportedOperationException: Schema for type java.util.Date/org.bson.types.ObjectId is not supported

now i know i can make it a java.sql.Date but let's say MyClass is depended upon in too many other places to make that change everywhere, that still won't solve the ObjectId problem.

i am also aware of the UserDefinedType option. But it seems like that only works if you also create a new class to work with it (and again, signature of MyClass needs to stay the same)

is there not a way to just register a serializer/deserializer for java.util.Date and org.bson.types.ObjectId so that i can call toDF on the RDD[MyClass] and it will just work?

UPDATE

so this doesn't exactly answer my question, but it unblocked us, so will share here in the hope that it's helpful for someone else. so most of the json libraries do support this use case, and spark-sql has a built-in sqlContext.read.json(stringRdd).write.parquet("/path/to/output"). so you can just define the (de)ser for the class using your json lib of choice, serialize to string, then spark-sql can handle the rest

Community
  • 1
  • 1
practechal
  • 344
  • 4
  • 8

1 Answers1

0

It depends on what you mean by just work. To serialize / deserialize an object all you need is a corresponding UserDefinedType and proper annotations. For example something like this:

@SQLUserDefinedType(udt = classOf[MyClassUDT])
case class MyClass(date: java.util.Date, oid: ObjectId)

class MyClassUDT extends UserDefinedType[MyClass] {
  override def sqlType: StructType = StructType(Seq(
    StructField("date", DateType, nullable = false),
    StructField("oid", StringType, nullable = false)
  ))

  override def serialize(obj: Any): InternalRow = {
    obj match {
      case MyClass(date, oid) =>
        val row = new GenericMutableRow(2)
        row(0) = new java.sql.Date(date.getTime)
        row(1) = UTF8String.fromString(oid.toString)
        row
    }
  }

  override def deserialize(datum: Any): MyClass = {
    datum match {
      case row: InternalRow =>
        val date: java.util.Date = new java.util.Date(
          row.get(0, DateType).asInstanceOf[java.sql.Date].getTime()
        )
        val oid = new ObjectId(row.getString(1))
        MyClass(date, oid)
    }
  }

  override def userClass: Class[MyClass] = classOf[MyClass]
}

It doesn't mean that you'll be able to access any method defined on a class or any of its fields. To be able to do that you'll need UDFs.

A little bit closer to seamless integration are Spark Datasets but AFAIK it is not possible to define custom encoders yet.

zero323
  • 322,348
  • 103
  • 959
  • 935
  • Interesting, hadn't thought about annotating the whole class. So, some of the detail i left out for the sake of the question is there are LOTS of `MyClass`es and they all have LOTS more fields. Would be prohibitively tedious to have do this for all of them (and maintain them when they change (fields are growing every month) ). For all the other serialization libraries (json4s, salat, (un)marshallers in spray, etc) we're using with these classes, there's always a mechanism to just say "for `this` class, apply `that` function, and do the default thing for everything else". hoping for that here – practechal Feb 05 '16 at 21:38
  • for instance, just being able to do `class ObjectIdUDT extends UserDefinedType[ObjectId] { def serialize(oid: ObjectId) = oid.toHexString() def deserialize(hexString: String) = new ObjectId(hexString) ... }` without the need to add the `@SQLUserDefinedType` to the definition of `ObjectId`. and then just register `ObjectIdUDT` with the `sqlContext` or something manually? – practechal Feb 05 '16 at 21:43
  • Well, it is not impossible to achieve (it is more or less what happens when you use case classes with supported types to create DFs / Datasets) but rather complicated in general case. Moreover quite a lot depends on how you want to access this data later. One way or another if ask about existing solutions which can do something like this I am not aware of any. – zero323 Feb 07 '16 at 17:04