Extending further from the observation of @aaronman, there is a workaround for this issue.
On the RDD, there're two definitions for distinct
:
/**
* Return a new RDD containing the distinct elements in this RDD.
*/
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] =
map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
/**
* Return a new RDD containing the distinct elements in this RDD.
*/
def distinct(): RDD[T] = distinct(partitions.size)
It's apparent from the signature of the first distinct
that there must be an implicit ordering of the elements and it's assumed null if absent, which is what the short version .distinct()
does.
There's no default implicit ordering for case classes, but it's easy to implement one:
case class Person(name:String,tel:String) extends Ordered[Person] {
def compare(that: Person): Int = this.name compare that.name
}
Now, trying the same example delivers the expected results (note that I'm comparing names):
val ps5 = Array(Person("peter","138"),Person("peter","55"),Person("john","138"))
sc.parallelize(ps5).distinct.collect
res: Array[P5] = Array(P5(john,111), P5(peter,139))
Note that case classes already implement equals
and hashCode
, so the impl on the provided example is unnecessary and also incorrect. The correct signature for equals
is: equals(arg0: Any): Boolean
-- BTW, I first thought that the issue had to do with the incorrect equals signature, which sent me looking in the wrong path.