0

I want to implement my comparison operators(equals, hashcode, ordering) in a data type defined by me in Spark SQL. Although Spark SQL UDT's still remains private, I follow some examples like this, to workaround this situation.

I have a class called MyPoint:

@SQLUserDefinedType(udt = classOf[MyPointUDT])
case class MyPoint(x: Double, y: Double) extends Serializable {

  override def hashCode(): Int = {
    println("hash code")
    31 * (31 * x.hashCode()) + y.hashCode()
  }

  override def equals(other: Any): Boolean =  {
    println("equals")
    other match {
      case that: MyPoint => this.x == that.x && this.y == that.y
      case _ => false
    }
  }

Then, I have the UDT class:

private class MyPointUDT extends UserDefinedType[MyPoint] {
  override def sqlType: DataType = ArrayType(DoubleType, containsNull = false)

  override def serialize(obj: MyPoint): ArrayData = {
    obj match {
      case features: MyPoint =>
        new GenericArrayData2(Array(features.x, features.y))
    }
   }

   override def deserialize(datum: Any): MyPoint = {
    datum match {
      case data: ArrayData if data.numElements() == 2 => {
        val arr = data.toDoubleArray()
        new MyPoint(arr(0), arr(1))
      }
    }
   }

  override def userClass: Class[MyPoint] = classOf[MyPoint]

  override def asNullable: MyPointUDT = this
}

Then I create a simple DataFrame:

val p1 = new MyPoint(1.0, 2.0)
val p2 = new MyPoint(1.0, 2.0)
val p3 = new MyPoint(10.0, 20.0)
val p4 = new MyPoint(11.0, 22.0)

val points = Seq(
  ("P1", p1),
  ("P2", p2),
  ("P3", p3),
  ("P4", p4)
).toDF("label", "point")

points.registerTempTable("points")
spark.sql("SELECT Distinct(point) FROM points").show()

The problem is: Why the SQL query doesn't execute the equals method inside MyPoint class? How comparasions are being made? How can I implement my comparasion operators in this example?

proxyfss
  • 113
  • 1
  • 8

0 Answers0