11

Here is my code example:

 case class Person(name:String,tel:String){
        def equals(that:Person):Boolean = that.name == this.name && this.tel == that.tel}

 val persons = Array(Person("peter","139"),Person("peter","139"),Person("john","111"))
 sc.parallelize(persons).distinct.collect

It returns

 res34: Array[Person] = Array(Person(john,111), Person(peter,139), Person(peter,139))

Why distinct doesn't work?I want the result as Person("john",111),Person("peter",139)

edwardsbean
  • 3,619
  • 5
  • 21
  • 25
  • 2
    I wonder if it has something to do with "peter" not being the same as "perter"? – kviiri Jul 22 '14 at 11:35
  • How much time did you spend looking into the problem before posting it? What do you expect as result of this test? – maasg Jul 22 '14 at 11:49
  • 3
    Flagged for closing as this appears to be caused by a simple typographical error. – kviiri Jul 22 '14 at 11:51
  • 1
    The typo was unfortunate while writing the question. I tried on Spark this and indeed is an issue. I reverted my earlier -1 – maasg Jul 22 '14 at 12:54
  • 1
    @kviiri this is a real (and rather puzzling) issue. Could you revert the close vote? – maasg Jul 22 '14 at 13:41
  • @maasg I agree this is peculiar, kviiri doesn't have enough rep to close vote though so it wasn't him – aaronman Jul 22 '14 at 14:07
  • I investigated further in the source code and the root cause is that case clases do not seem to work as keys in Spark. I've created a discussion on the mailing list: http://apache-spark-user-list.1001560.n3.nabble.com/Using-case-classes-as-keys-does-not-seem-to-work-td10407.html – maasg Jul 22 '14 at 14:25
  • 1
    FYI - I created a bug against Spark 1.0.0 - https://issues.apache.org/jira/browse/SPARK-2620 – maasg Jul 22 '14 at 16:08
  • What happens if you don't write your own equals method?? - after all you have no need to do so. Just a shot in the dark as a workaround. – samthebest Jul 22 '14 at 20:52
  • @maasg, I'm sorry but at the time I flagged it there really was a typo and the issue was fundamentally different. – kviiri Jul 23 '14 at 05:07
  • (Also, it appears that I can't retract any flags, which is a rather annoying "feature" of the site) – kviiri Jul 23 '14 at 05:14

3 Answers3

1

Extending further from the observation of @aaronman, there is a workaround for this issue. On the RDD, there're two definitions for distinct:

 /**
   * Return a new RDD containing the distinct elements in this RDD.
   */
  def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] =
    map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)

  /**
   * Return a new RDD containing the distinct elements in this RDD.
   */
  def distinct(): RDD[T] = distinct(partitions.size)

It's apparent from the signature of the first distinct that there must be an implicit ordering of the elements and it's assumed null if absent, which is what the short version .distinct() does.

There's no default implicit ordering for case classes, but it's easy to implement one:

case class Person(name:String,tel:String) extends Ordered[Person] {
  def compare(that: Person): Int = this.name compare that.name
}

Now, trying the same example delivers the expected results (note that I'm comparing names):

val ps5 = Array(Person("peter","138"),Person("peter","55"),Person("john","138"))
sc.parallelize(ps5).distinct.collect

res: Array[P5] = Array(P5(john,111), P5(peter,139))

Note that case classes already implement equals and hashCode, so the impl on the provided example is unnecessary and also incorrect. The correct signature for equals is: equals(arg0: Any): Boolean -- BTW, I first thought that the issue had to do with the incorrect equals signature, which sent me looking in the wrong path.

maasg
  • 37,100
  • 11
  • 88
  • 115
1

For me the problem was related to object equality, as mentioned by Martin Odersky in Programming in Scala (chapter 30), although I have a normal class (not a case class). For a correct equality test, you must re-define (override) hashCode() if you have a custom equals(). Also you need to have a canEqual() method for 100% correctness. I haven't looked at the implementation details of an RDD, but since it is a collection, probably it uses some complex/parallel variation of a HashSet or other hash-based data structure for comparing objects and ensuring distinctness.

Declaring hashSet(), equals(), canEqual(), and compare() methods solved my problem:

override def hashCode(): Int = {
  41 * (41 + name.hashCode) + tel.hashCode
}

override def equals(other: Any) = other match {
  case other: Person =>
    (other canEqual this) &&
    (this.name == other.name) && (this.tel == other.tel)
  case _ =>
    false
}

def canEqual(other: Any) = other.isInstanceOf[Person]

def compare(that: Person): Int = {
  this.name compare that.name
}
Javad
  • 5,755
  • 4
  • 41
  • 51
0

As others have pointed out this is a bug in spark 1.0.0. My theory as to where it is coming from is that if you look at the diff of 1.0.0 to 9.0 you see

-  def repartition(numPartitions: Int): RDD[T] = {
+  def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = {

And if you run

case class A(i:Int)
implicitly[Ordering[A]]  

You get an error

<console>:13: error: No implicit Ordering defined for A.
              implicitly[Ordering[A]]  

So I think the workaround is define an implicit ordering for a the case class, unfortunately I'm not a scala expert but this answer seems to do it correctly

Community
  • 1
  • 1
aaronman
  • 18,343
  • 7
  • 63
  • 78
  • @MrQuestion honestly this is just a guess, I'm not fully sure how implicit resolution works in scala – aaronman Jul 23 '14 at 02:56