3

I am currently writing a program where I am deciding whether to use a groupByKey followed by a join or simply a join.

Essentially I have one RDD with many values per key and another RDD with only one value per key, but that value is very large. My question is when I join those values together, would spark end up making a lot of copies of the large value (one for every single instance of the smaller value) or would spark keep only one copy of the large value while giving references to all of the original values.

Essentially I'd have a situation like this:

val InvIndexes:RDD[(Int,InvertedIndex)] //InvertedIndex is very large
val partitionedVectors:RDD[(Int, Vector)]

val partitionedTasks:RDD[(Int, (Iterator[Vector], InvertedIndex))] = partitionedvectors.groupByKey().join(invIndexes)


val similarities = partitionedTasks.map(//calculate similarities)

My question if there would actually be any major space complexity difference between the code before and doing this:

val InvIndexes:RDD[(Int,InvertedIndex)]
val partitionedVectors:RDD[(Int, Vector)]

val partitionedTasks:RDD[(Int, (Vector, InvertedIndex))] = partitionedvectors.join(invIndexes)

val similarities = partitionedTasks.map(//calculate similarities)
Daniel Imberman
  • 618
  • 1
  • 5
  • 18

1 Answers1

1

Technically speaking there should be no copying. join is basically a cogroup followed by flatMapValues with a nested for comprehension. Assuming that as single co-grouped element looks like this:

val pair = (1, (
  Seq(Vectors.dense(Array(1.0)), Vectors.dense(Array(2.0))),
  Seq(Vectors.dense(Array(3.0)), Vectors.dense(Array(4.0)))
))

subsequent operations is equivalent to:

val result = pair match {
  case (k, (xs, ys)) => xs.flatMap(x => ys.map(y => (k, (x, y))))
}

and as expected vectors are not copied:

require(result(0)._2._1 eq result(1)._2._1)

As long as data is handled in memory or a whole partition is serialized / deserialized (for example in collect) this status should be preserved, but personally I wouldn't depend on this. Even if you ignore low level details of implementation a simple shuffle may require a full copy.

zero323
  • 322,348
  • 103
  • 959
  • 935
  • Thank you @zero323. So two questions: 1. Are you saying that the data is not copied only if the entire partition is small enough to fit on one machine's memory? Or are you saying that I should be concerned about copying the object in future functions (in which case this would not be a huge concern as once I find the similarities I would not need the inverted index anymore) – Daniel Imberman Jan 19 '16 at 16:08
  • 2. If you believe that this method would be unreliable, would it be a better idea to do something along the lines of my first code snippet where I groupByKey and then hold on to the InvertedIndex while I traverse the iterator? – Daniel Imberman Jan 19 '16 at 16:08
  • Personally I would consider cogroup and flatMap. You can utilize InvertedIndex directly and save the space required to store all the references. – zero323 Jan 19 '16 at 17:34