24

Will rdd1.join(rdd2) cause a shuffle to happen if rdd1 and rdd2 have the same partitioner?

Daniel Darabos
  • 26,991
  • 10
  • 102
  • 114
zwb
  • 886
  • 2
  • 8
  • 21
  • 4
    Can you rewrite this question to be more clear? Just because RDDs have partitions on the same machines doesn't mean all keys are always on the same partition across both. What are you asking then? – Sean Owen Feb 08 '15 at 15:06
  • 1
    I've rewritten the question completely. I think it makes sense now, but I'm not sure it's what @zwb meant. I did not really understand the original. Feel free to revert my edit and update the question if necessary. – Daniel Darabos Feb 08 '15 at 21:46
  • Thanks, i come from china and my english is poor,i can't express myself very clear and what you rewritten is my sense. – zwb Feb 08 '15 at 23:53

1 Answers1

39

No. If two RDDs have the same partitioner, the join will not cause a shuffle. You can see this in CoGroupedRDD.scala:

override def getDependencies: Seq[Dependency[_]] = {
  rdds.map { rdd: RDD[_ <: Product2[K, _]] =>
    if (rdd.partitioner == Some(part)) {
      logDebug("Adding one-to-one dependency with " + rdd)
      new OneToOneDependency(rdd)
    } else {
      logDebug("Adding shuffle dependency with " + rdd)
      new ShuffleDependency[K, Any, CoGroupCombiner](rdd, part, serializer)
    }
  }
}

Note however, that the lack of a shuffle does not mean that no data will have to be moved between nodes. It's possible for two RDDs to have the same partitioner (be co-partitioned) yet have the corresponding partitions located on different nodes (not be co-located).

This situation is still better than doing a shuffle, but it's something to keep in mind. Co-location can improve performance, but is hard to guarantee.

Grega Kešpret
  • 11,827
  • 6
  • 39
  • 44
Daniel Darabos
  • 26,991
  • 10
  • 102
  • 114
  • Thanks again, i basically unstand and i will read the source code to get more. – zwb Feb 08 '15 at 23:57
  • 1
    Follow-up question: are there any Spark SQL implementations (including separate projects not in the main distro) that take advantage of co-partitioning? – WestCoastProjects Mar 04 '15 at 09:08
  • @javadba: It's probably worth asking in a separate question. http://spark.apache.org/docs/latest/sql-programming-guide.html says about `spark.sql.shuffle.partitions` that it _"configures the number of partitions to use when shuffling data for joins or aggregations."_ So that's a good sign. – Daniel Darabos Mar 04 '15 at 10:10
  • @daniel darabos http://stackoverflow.com/questions/28850596/co-partitioned-joins-in-spark-sql – WestCoastProjects Mar 04 '15 at 17:02
  • 3
    @DanielDarabos, what exactly is the difference in performance terms when you say: *the lack of a shuffle does not mean that no data will have to be moved between nodes*. If I understood well RDDs co-partitioned having corresponding partitions located on different nodes, will cause all the partitions of one of the RDD to be moved into the node where are corresponding partitions of the other RDD, is it right? All that data moving can be considered a shuffle after all? – Giorgio Mar 23 '17 at 17:20
  • 1
    Sure, if you create your own definition of what "shuffle" means, you can always make it so that what happens when co-partitioned RDDs are joined is a "shuffle". But this word actually has a definition within Spark, and the answer uses this definition. No shuffle takes place when co-partitioned RDDs are joined. – Daniel Darabos Mar 23 '17 at 17:30
  • Thx for the answer! I have two follow-up questions. How much relocation is faster than repartitioning? How co-location can be ensured? – AlexanderLedovsky Oct 16 '17 at 15:34
  • 2
    Repartitioning is a shuffle: all executors copy to all other executors. Relocation is a one-to-one dependency: each executor only copies from at most one other executor. How much faster it is in your case you can find by benchmarking. One way to ensure colocation is for the two RDDs to be derived from a common ancestor. – Daniel Darabos Oct 16 '17 at 16:37
  • TWIMC, I'm using a custom partitioner in PySpark, and joining co-partitioned RDDs still triggers a shuffle (in Spark 1.6). – Def_Os Nov 29 '18 at 18:42