1

EDIT: I have a collection of vectors and am trying to compute a pairwise relation across each vector with every other vectors. I then need to group the results for each vector. The approach I'm trying is as follows (I understand that it computes each pair 2x):

Option 1:

val myRDD: RDD[MyType]

val grouped: RDD[(MyType, List[MyVector])] = myRDD.cartesian(myRDD)
  .mapValues(List(_))
  .reduceByKey( (x,y) => x:::y ) // or groupBy(_).mapValues(_.toList)

Option 2:

val items: Array[MyType] = for (row <- myRDD.collect) yield row
val grouped: RDD[(MyType, List[MyVector])] = myRDD.map(x => (x, items.map(y => (x, y)).toList))

Option 1 seems like the natural choice, but what I'm finding is that even for very small sets, e.g., ~500 elements, with each element for example a list of one hundred Doubles, the reduceByKey (or groupBy, which I've also tried) maps to 40000 ShuffleMapTasks that complete at a rate of about 10 per second. After about 30 minutes, when approx. 1/4 are done, the job fails with a GC out of memory error. Is there a way to ensure that the cartesian product preserves partitions? Is there a more efficient way to handle the reduce task? I've also tried different keys (e.g., Ints), but there was no improvement.

Option 2 is very fast for my particular case because the collection can fit in memory, but of course it seems like a poor choice for larger collections.

I've seen a few similar questions, e.g.,

https://groups.google.com/forum/#!topic/spark-users/TZla5TnAMTU

Spark: what's the best strategy for joining a 2-tuple-key RDD with single-key RDD?

I'm sure others have run into this particular problem, and I'd really appreciate any pointers! Thank you.

Community
  • 1
  • 1
y2s
  • 71
  • 1
  • 6
  • *Why* do you need to group the results on each vector??? *Why* are these things vectors? I think your missing out some important information while confusing the question with unnecessary information, as it stands the question is cryptic. – samthebest Sep 13 '14 at 02:31
  • Sorry for I'm bothering you, but I am having an issue while trying to use `cartesian()` with a type defined by me. In this case, it doesn't work, whereas with default java types it works like a charm. Have you any idea about the reason of my issue? Thanks – mgaido Feb 26 '15 at 16:11
  • just my two cents on an old comment @mark91 if something works on Java standard classes but not on your class they might not implement the right Spark Function and they need to implement Serializable as well, just an idea – JimLohse Jan 07 '16 at 22:19

1 Answers1

2

You can

  1. Try to solve ur shuffle problems by reading Why does a job fail with "No space left on device", but df says otherwise? but I much prefer the other options
  2. Use your option 2, neither solution will scale well, when the collection gets large you will OOMs in both
  3. Same as 2, but put it in a broadcast variable, you then use up a lot less RAM (1 copy of dataset per node)
  4. Ask another question that details the higher level picture as to what you want to do, chances you can avoid a carrtesian product altogether ... but no one can help show you until you give context.

To expand on 3 and my comment, sounds like you have:

val myThings: RDD[MyType] = ...

And you want

val pairedWithOpResults: RDD[(MyType, MyType, OpResult]

So I would do (pseudocode):

val myThingsBroadcast = sc.broadcast(myThings.toArray())

myThings.flatMap(thingLeft => myThingsBroadcast.value.map(thingRight => (thingLeft, thingRight, myOp(thingLeft, thingRight)))

If you then want to perform some groupBy, you'll need to clearly explain what you want to do with each resulting group.

Community
  • 1
  • 1
samthebest
  • 30,803
  • 25
  • 102
  • 142
  • Sam, thanks for your response. The larger context of this particular problem, as I have detailed in an edit, is to do a pairwise computation over each pair of vectors in the collection. In general, though, I need to do other groupBy operations that are not simple aggregations where the number of keys is on the same order of magnitude as the number of records. – y2s Sep 12 '14 at 14:25
  • *That is, do a pairwise computation and then group the results for each vector with every other vector. – y2s Sep 12 '14 at 15:01
  • @y2s (I've updated to include broadcast variables). I'm still a bit fussy, sounds like the fact these things are vectors is unimportant ... Ill update to say what i mean – samthebest Sep 13 '14 at 02:18
  • Thanks, Sam -- I had been thinking about broadcast variables as well. If the RDD is too large to collect and broadcast, is it reasonable to chunk or otherwise stream it and broadcast in pieces? The fact that the types are vector-like is partially relevant, as in certain cases I would like to do computations over each list (e.g., compute distances as in KNN, summary stats., etc.). (Aside: there is an interesting PR for pairwise cosine distances in https://issues.apache.org/jira/browse/SPARK-2885.) – y2s Sep 13 '14 at 22:27
  • What we do to handle huge BVs is stop a worker on one node and use that node for the driver (giving the driver process a lot of RAM). This means you must sacrifice one of your nodes :/ but is much nicer than using a sequence/stream of BVs IMO ... It would be nice if there was a way to reuse a worker process as a driver too. – samthebest Sep 14 '14 at 02:23