3

Given two large key-valued pair RDDs (d1 and d2), both composed of unique ID keys and vector values (e.g. RDD[Int,DenseVector]), I need to map d1 in order to obtain for each of its element the ID of the closest element in d2 using a euclidean distance metric between vectors.

I have not found a way to do it using standard RDD transformations. I understand that nested RDDs are not allowed in Spark, however, if it was possible, an easy solution would be:

d1.map((k,v) => (k, d2.map{case (k2, v2) => val diff = (v - v2); (k2, sqrt(diff dot diff))} 
                      .takeOrdered(1)(Ordering.by[(Double,Double), Double](_._2))      
                      ._1))

Moreover, if d1 was small, I could work with a Map (e.g. d1.collectAsMap()) and loop over each of its elements, but this is not an option due to the dataset size.

Is there any alternative to this transformation in Spark?

EDIT 1:

Using @holden and @david-griffin suggestions I solved the issue using cartesian() and reduceByKey(). This is the script (assuming sc as the SparkContext and the use of the Breeze library).

val d1 = sc.parallelize(List((1,DenseVector(0.0,0.0)), (2,DenseVector(1.0,0.0)), (3,DenseVector(0.0,1.0))))
val d2 = sc.parallelize(List((1,DenseVector(0.0,0.75)), (2,DenseVector(0.0,0.25)), (3,DenseVector(1.0,1.0)), (4,DenseVector(0.75,0.0))))

val d1Xd2 = d1.cartesian(d2)
val pairDistances = d1Xd2.map{case ((k1, v1), (k2, v2)) => (k1, (k2, sqrt(sum(pow(v1-v2,2)))))}
val closestPoints = pairDistances.reduceByKey{case (x, y) => if (x._2 < y._2) x else y }

closestPoints.foreach(s => println(s._1 + " -> " + s._2._1))

The output obtained is:

1 -> 2
2 -> 4
3 -> 1
Jorge Luis
  • 651
  • 7
  • 7
  • 2
    I would convert them to DataFrames, and try and do it with a `join` between the two. Otherwise, I think you have to do `d1.cartesian(d2)` and then use `reduce` to find the shortest distance for each `d1._1` – David Griffin May 27 '15 at 14:46
  • 1
    Pretty much a duplicate of this: http://stackoverflow.com/a/29953122/21755 – The Archetypal Paul May 28 '15 at 07:58
  • If both datasets are large, you cannot compare every element to every other element. You need to use a space partitioning scheme, then `join` the partitions and find the best matches within the partitions. – Daniel Darabos May 28 '15 at 09:18
  • @Paul the answer you redirect to also works. It generalizes when the number of neighbors needed is more than one (e.g. for KNN). – Jorge Luis May 28 '15 at 14:24

1 Answers1

2

Transformations on RDDs can only be applied on the driver side, so nesting of maps won't work. As @davidgriffin points out you can use cartesian. For your use case you probably want to follow that up with reduceByKey and inside of your reduce by key you can keep track of the minimum distance.

Holden
  • 7,392
  • 1
  • 27
  • 33