2

My goal is to have the k nearest neighbours of each data point. I would like to avoid the use of a for loop with lookup and use something else simultaneously on each rdd_distance point, but I can't figure out how to do this.

parsedData = RDD[Object]
//Object have an id and a vector as attribute
//sqdist1 output is a Double

var rdd_distance = parsedData.cartesian(parsedData)
  .flatMap { case (x,y) =>
    if(x.get_id != y.get_id) 
      Some((x.get_id,(y.get_id,sqdist1(x.get_vector,y.get_vector))))
    else None
  }
for(ind1 <- 1 to size) {
  val ind2 = ind1.toString
  val tab1 = rdd_distance.lookup(ind2)
  val rdd_knn0 = sc.parallelize(tab1)
  val tab_knn = rdd_knn0.takeOrdered(k)(Ordering[(Double)].on(x=>x._2))
}

Is that possible without use a for loop with lookup ?

ryanyuyu
  • 6,366
  • 10
  • 48
  • 53
KyBe
  • 842
  • 1
  • 14
  • 33

1 Answers1

2

This code solves your question (but inefficient when the number of parsedData is big).

  rdd_distance.groupByKey().map {
    case (x, iterable) =>
      x -> iterable.toSeq.sortBy(_._2).take(k)
  }

So this is more appropriate solution.

import org.apache.spark.mllib.rdd.MLPairRDDFunctions._    

rdd_distance.topByKey(k)(Ordering.by(-_._2)) // because smaller is better.

Note that this code is included Spark 1.4.0. If you use the earlier version, use this code instead https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/rdd/MLPairRDDFunctions.scala

The idea of topBykey is to use BoundedPriorityQueue with aggregateByKey which retains top k items.

emesday
  • 6,078
  • 3
  • 29
  • 46
  • Unfortunately, parsedData is big, and i want to avoid groupByKey which is, on what i read, not enought performant. – KyBe Jun 26 '15 at 13:13
  • Right, so you need to take a look at `topByKey`. – emesday Jun 26 '15 at 13:14
  • Is there an equivalent which gives me the minByKey rather than topByKey or this is the by(-_._2) which do the trick. – KyBe Jun 27 '15 at 10:12