6

I'm Trying to perform a K nearest neighbor search using spark.

I have a RDD[Seq[Double]] and I'm planing to return a RDD[(Seq[Double],Seq[Seq[Double]])] with the actual row and a list of neighbors

val out = data.map(row => {
    val neighbours = data.top(num = 3)(new Ordering[Seq[Double]] {
      override def compare(a:Seq[Double],b:Seq[Double]) = {
        euclideanDistance(a,row).compare(euclideanDistance(b,row))*(-1)
      }
    })
  (row,neighbours.toSeq)
})

And it Gives the following error on spark Submit

15/04/29 21:15:39 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 2, 192.168.1.7): org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.

I understand that nesting RDD is not possible but how do i perform such operations where I can compare every element in the RDD with every other element in the RDD

eliasah
  • 39,588
  • 11
  • 124
  • 154
Vishnu667
  • 768
  • 1
  • 16
  • 39
  • How to compare every element in the RDD with every other element in the RDD ? – Vishnu667 Apr 29 '15 at 15:59
  • 2
    You can use the `cartesian` method to produce an RDD of all possible pairs – stholzm Apr 29 '15 at 16:26
  • 1
    There's a recent question very similar to this but I can't find it right now... – The Archetypal Paul Apr 29 '15 at 16:35
  • 2
    Use cartesian to produce all pairs. Then map over that to calculate the distance between all pairs, with the first element still the original row value. aggregateByKey, using a combine function that maintains the top "k" nearest values? (you could include the euclidean calculation in the combine function, which would be a bit quicker but maybe less clear) – The Archetypal Paul Apr 29 '15 at 16:40

1 Answers1

10

Something like this should do it.

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD

val conf = new SparkConf().setAppName("spark-scratch").setMaster("local")
val sco= new SparkContext(conf)

// k is the number of nearest neighbors required 
val k = 3

// generate 5 rows of two-dimensional coordinates
val rows = List.fill(5)(List.fill(2)(Math.random))
val dataRDD = sco.parallelize(rows, 1)

// No need for the sqrt as we're just comparing them
def euclidean(a:List[Double], b:List[Double]) = 
 (a zip b) map {case (x:Double, y:Double) => (x-y)*(x-y)} sum

// get all pairs
val pairs = dataRDD.cartesian(dataRDD)

// case class to keep things a bit neater
// the neighbor, and its distance from the current point
case class Entry(neighbor: List[Double], dist:Double)

// map the second element to the element and distance from the first
val pairsWithDist = pairs.map {case (x, y) => (x, Entry(y, euclidean(x,y)))}

// merge a row of pairsWithDist with the ResultRow for this point
def mergeOne(u: List[Entry], v:Entry) = (v::u).sortBy{_.dist}.take(k)

// merge two results from different partitions
def mergeList(u: List[Entry], v:List[Entry]) = (u:::v).sortBy{_.dist}.take(k)

val nearestNeighbors = pairsWithDist
                      .aggregateByKey(List[Entry]())(mergeOne, mergeList)
The Archetypal Paul
  • 41,321
  • 20
  • 104
  • 134
  • What is the *reduce _ + _* for in the euclidean() method? – WestCoastProjects Apr 30 '15 at 01:35
  • It is a shortcut for `reduce{ (a, b) => a + b }`. Multiple underscores in a lambda expression denote different parameters. – stholzm Apr 30 '15 at 05:28
  • The reduce _+_ just does a sum operation on the list[Double] that the map function returns – Vishnu667 Apr 30 '15 at 05:34
  • All correct, but of course it would have been clearer had I used `sum`. Updated – The Archetypal Paul Apr 30 '15 at 06:28
  • @Paul Thank you for your answer. Is this the most effective way of Doing a K- Nearest Neighbor Query using spark or is there any other way to optimize this ? – Vishnu667 Apr 30 '15 at 07:10
  • It's effective :) Do you mean "most efficient" ? If so, I've no idea, it's not something I've looked at in detail, your question just seemed an interesting puzzle. If `k` is large, the merge functions could be a bit more efficient using some form of top (e,.g a k-entry heap) but overall since it has to do pairwise comparisons of every element with every other it's always going to be expensive – The Archetypal Paul Apr 30 '15 at 07:14
  • The dataset is going to be large the k values might start from 10 and may go all the way up to 100 or even more depending on the dataset. Guess I'll stick to this solution and update if i find a more efficient one. – Vishnu667 Apr 30 '15 at 07:28
  • For larger k, http://en.wikipedia.org/wiki/Selection_algorithm and some other SO questions maybe relevant. Are you going to use the nearest-k-neighbours of every point? The most efficient solution is going to be to calculate only what you need... – The Archetypal Paul Apr 30 '15 at 07:59