Given two large key-valued pair RDDs (d1
and d2
), both composed of unique ID keys and vector values (e.g. RDD[Int,DenseVector]
), I need to map d1
in order to obtain for each of its element the ID of the closest element in d2
using a euclidean distance metric between vectors.
I have not found a way to do it using standard RDD transformations. I understand that nested RDDs are not allowed in Spark, however, if it was possible, an easy solution would be:
d1.map((k,v) => (k, d2.map{case (k2, v2) => val diff = (v - v2); (k2, sqrt(diff dot diff))}
.takeOrdered(1)(Ordering.by[(Double,Double), Double](_._2))
._1))
Moreover, if d1
was small, I could work with a Map (e.g. d1.collectAsMap()
) and loop over each of its elements, but this is not an option due to the dataset size.
Is there any alternative to this transformation in Spark?
EDIT 1:
Using @holden and @david-griffin suggestions I solved the issue using cartesian()
and reduceByKey()
. This is the script (assuming sc
as the SparkContext
and the use of the Breeze library).
val d1 = sc.parallelize(List((1,DenseVector(0.0,0.0)), (2,DenseVector(1.0,0.0)), (3,DenseVector(0.0,1.0))))
val d2 = sc.parallelize(List((1,DenseVector(0.0,0.75)), (2,DenseVector(0.0,0.25)), (3,DenseVector(1.0,1.0)), (4,DenseVector(0.75,0.0))))
val d1Xd2 = d1.cartesian(d2)
val pairDistances = d1Xd2.map{case ((k1, v1), (k2, v2)) => (k1, (k2, sqrt(sum(pow(v1-v2,2)))))}
val closestPoints = pairDistances.reduceByKey{case (x, y) => if (x._2 < y._2) x else y }
closestPoints.foreach(s => println(s._1 + " -> " + s._2._1))
The output obtained is:
1 -> 2
2 -> 4
3 -> 1