2

I’m looking for a way to compare subsets of an RDD intelligently.

Lets say I had an RDD with key/value pairs of type (Int->T). I eventually need to say “compare all values of key 1 with all values of key 2 and compare values of key 3 to the values of key 5 and key 7”, how would I go about doing this efficiently?

The way I’m currently thinking of doing it is by creating a List of filtered RDDs and then using RDD.cartesian()

def filterSubset[T] = (b:Int, r:RDD[(Int, T)]) => r.filter{case(name, _) => name == b} 

Val keyPairs:(Int, Int) // all key pairs

Val rddPairs = keyPairs.map{

            case (a, b) =>

                filterSubset(a,r).cartesian(filterSubset(b,r))

        }

rddPairs.map{whatever I want to compare…}

I would then iterate the list and perform a map on each of the RDDs of pairs to gather the relational data that I need.

What I can’t tell about this idea is whether it would be extremely inefficient to set up possibly of hundreds of map jobs and then iterate through them. In this case, would the lazy valuation in spark optimize the data shuffling between all of the maps? If not, can someone please recommend a possibly more efficient way to approach this issue?

Thank you for your help

Daniel Imberman
  • 618
  • 1
  • 5
  • 18
  • 1
    Can you better explain your comparison logic? Which keys do you want to compare? – Jean Logeart Jan 04 '16 at 19:34
  • 1
    Sure, so a good example of this would be if I had an RDD[(Int, Vector)] and I only wanted to calculate the cosine similarity of vectors with associated keys (the keys essentially partition different groups of vectors). – Daniel Imberman Jan 04 '16 at 19:37
  • 1
    How much data do you have per key? Is it one-to-one mapping or do you want to compare with multiple subsets (like 1 with {2, 3, 5, 7})? – zero323 Jan 04 '16 at 20:58
  • 1
    @zero323 So the ultimate goal would be to compare 1 with {2, 3, 5, 7...}. In the hadoop version of the paper I'm working with they actually serialized the subsets into compressed "forward indexes" and then sends the associated forward indexes to each task (where a task would locally contain the primary values) – Daniel Imberman Jan 04 '16 at 21:06
  • And how about amount of data? Is it reasonable to assume that single pair of keys can be processed on one executor? – zero323 Jan 04 '16 at 21:09
  • And how do you use data after computing similarity? Is there any filter afterwards? – zero323 Jan 04 '16 at 21:11
  • @zero323 When you say a pair of keys do you mean that the entirety of filtersubset(a) and filtersubset(b) could fit in a single executor? – Daniel Imberman Jan 04 '16 at 21:20
  • Also, there might be some form of filtering afterwards (i.e. if the similarity < threshold it won't be sent to the next step) – Daniel Imberman Jan 04 '16 at 21:20
  • Yes, this what I mean and if not how many machines do you need to process data for a single key. – zero323 Jan 04 '16 at 21:26
  • Let us [continue this discussion in chat](http://chat.stackoverflow.com/rooms/99735/discussion-between-zero323-and-daniel-imberman). – zero323 Jan 04 '16 at 21:26

2 Answers2

3

One way you can approach this problem is to replicate and partition your data to reflect key pairs you want to compare. Lets start with creating two maps from the actual keys to the temporary keys we'll use for replication and joins:

def genMap(keys: Seq[Int]) = keys
  .zipWithIndex.groupBy(_._1)
  .map{case (k, vs) => (k -> vs.map(_._2))}

val left = genMap(keyPairs.map(_._1))
val right = genMap(keyPairs.map(_._2))

Next we can transform data by replicating with new keys:

def mapAndReplicate[T: ClassTag](rdd: RDD[(Int, T)], map: Map[Int, Seq[Int]]) = {
  rdd.flatMap{case (k, v) => map.getOrElse(k, Seq()).map(x => (x, (k, v)))}  
}

val leftRDD = mapAndReplicate(rddPairs, left)
val rightRDD = mapAndReplicate(rddPairs, right)

Finally we can cogroup:

val cogrouped = leftRDD.cogroup(rightRDD)

And compare / filter pairs:

cogrouped.values.flatMap{case (xs, ys) => for {
  (kx, vx) <- xs
  (ky, vy) <- ys
  if cosineSimilarity(vx, vy) <= threshold
} yield ((kx, vx), (ky, vy)) }

Obviously in the current form this approach is limited. It assumes that values for arbitrary pair of keys can fit into memory and require a significant amount of network traffic. Still it should give you some idea how to proceed.

Another possible approach is to store data in the external system (for example database) and fetch required key-value pairs on demand.

Since you're trying to find similarity between elements I would also consider completely different approach. Instead of naively comparing key-by-key I would try to partition data using custom partitioner which reflects expected similarity between documents. It is far from trivial in general but should give much better results.

zero323
  • 322,348
  • 103
  • 959
  • 935
  • Thank you zero323. This gives me a lot of potential directions to take this project (I particularly like the idea of using a custom partitioner) – Daniel Imberman Jan 05 '16 at 18:33
0

Using Dataframe you can easily do the cartesian operation using join:

dataframe1.join(dataframe2, dataframe1("key")===dataframe2("key"))

It will probably do exactly what you want, but efficiently.

If you don't know how to create an Dataframe, please refer to http://spark.apache.org/docs/latest/sql-programming-guide.html#creating-dataframes