0

Yesterday (practically the full journal) I tried to figure out an elegant way to represent a model with circular references in Scala/Spark SQL 2.2.1

Let's say that this is the original model approach that, of course, it is not working (keep in mind that the real model has tens of attributes):

case class Branch(id: Int, branches: List[Branch] = List.empty)
case class Tree(id: Int, branches: List[Branch])

val trees = Seq(Tree(1, List(Branch(2, List.empty), Branch(3, List(Branch(4, List.empty))))))

val ds = spark.createDataset(trees)
ds.show

And this is the error that it throws:

java.lang.UnsupportedOperationException: cannot have circular references in class, but got the circular reference of class Branch

I know that the maximum hierarchy level that we have is 5. So, as a workaround, I though in something like:

case class BranchLevel5(id: Int)
case class BranchLevel4(id: Int, branches: List[BranchLevel5] = List.empty)
case class BranchLevel3(id: Int, branches: List[BranchLevel4] = List.empty)
case class BranchLevel2(id: Int, branches: List[BranchLevel3] = List.empty)
case class BranchLevel1(id: Int, branches: List[BranchLevel2] = List.empty)
case class Tree(id: Int, branches: List[BranchLevel1])

Of course, it is working. But this is not elegant at all and you can imagine the pain around the implementation (readability, coupling, maintenance, usability, duplication of code, etc.)

So the question is, how to handle cases with circular references in the model?

angelcervera
  • 3,699
  • 1
  • 40
  • 68

1 Answers1

0

If you're fine with using private API, then I found one way that just works: treating the whole self-referential structure as a user-defined type. I'm following the approach from this answer: https://stackoverflow.com/a/51957666/1823254.

package org.apache.spark.custom.udts // we're calling some private API so need to be under 'org.apache.spark'

import java.io._
import org.apache.spark.sql.types.{DataType, UDTRegistration, UserDefinedType}

class BranchUDT extends UserDefinedType[Branch] {

  override def sqlType: DataType = org.apache.spark.sql.types.BinaryType
  override def serialize(obj: Branch): Any = {
    val bos = new ByteArrayOutputStream()
    val oos = new ObjectOutputStream(bos)
    oos.writeObject(obj)
    bos.toByteArray
  }
  override def deserialize(datum: Any): Branch = {
    val bis = new ByteArrayInputStream(datum.asInstanceOf[Array[Byte]])
    val ois = new ObjectInputStream(bis)
    val obj = ois.readObject()
    obj.asInstanceOf[Branch]
  }

  override def userClass: Class[Branch] = classOf[Branch]
}

object BranchUDT {
  def register() = UDTRegistration.register(classOf[Branch].getName, classOf[BranchUDT].getName)
}

Just create and register a custom UDT, and that's it!

BranchUDT.register()
val trees = Seq(Tree(1, List(Branch(2, List.empty), Branch(3, List(Branch(4, List.empty))))))

val ds = spark.createDataset(trees)
ds.show(false)

//+---+----------------------------------------------------+
//|id |branches                                            |
//+---+----------------------------------------------------+
//|1  |[Branch(2,List()), Branch(3,List(Branch(4,List())))]|
//+---+----------------------------------------------------+
Worakarn Isaratham
  • 1,034
  • 1
  • 9
  • 16
  • Good point but I don't like the idea to use hidden private stuff that easily is going to change in new versions. IMO, If it is private, pretty sure that it is because of a reason. – angelcervera Sep 22 '18 at 13:08
  • I can not do your code work. Where do you put this line? UDTRegistration.register(classOf[Branch].getName, classOf[BranchUDT].getName) – angelcervera Sep 23 '18 at 15:39
  • @angelcervera It's the part that access the private API, therefore need to be in a file under package org.apache.spark. I'll update the post. – Worakarn Isaratham Sep 23 '18 at 16:46
  • I tried that before and did not work. I did something wrong. The important is that your code works like a charm (except the kryo import that for me is com.esotericsoftware.kryo.Kryo) Thanks! – angelcervera Sep 23 '18 at 18:19
  • The big limitation of the solution is that is not possible to access the fields of the custom type from spark sql or use the built-in funtions. :( – angelcervera Sep 23 '18 at 18:34
  • BTW, why do you need to create an instance of Kryo if you never use it? – angelcervera Sep 24 '18 at 09:00
  • You're right on the limitation - once we treat it as a custom type then all column-based operations would no longer work. That's probably why circular types are forbidden in the first place. – Worakarn Isaratham Sep 24 '18 at 09:18
  • and yes the Kryo instance is unnecessary - thanks for the catch! – Worakarn Isaratham Sep 24 '18 at 09:19