45

I have trouble to find in the Spark documentation operations that causes a shuffle and operation that does not. In this list, which ones does cause a shuffle and which ones does not?

Map and filter does not. However, I am not sure with the others.

map(func)
filter(func)
flatMap(func)
mapPartitions(func)
mapPartitionsWithIndex(func)
sample(withReplacement, fraction, seed)
union(otherDataset)
intersection(otherDataset)
distinct([numTasks]))
groupByKey([numTasks])
reduceByKey(func, [numTasks])
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])
sortByKey([ascending], [numTasks])
join(otherDataset, [numTasks])
cogroup(otherDataset, [numTasks])
cartesian(otherDataset)
pipe(command, [envVars])
coalesce(numPartitions)
SparkleGoat
  • 503
  • 1
  • 9
  • 22
poiuytrez
  • 21,330
  • 35
  • 113
  • 172

4 Answers4

51

It is actually extremely easy to find this out, without the documentation. For any of these functions just create an RDD and call to debug string, here is one example you can do the rest on ur own.

scala> val a  = sc.parallelize(Array(1,2,3)).distinct
scala> a.toDebugString
MappedRDD[5] at distinct at <console>:12 (1 partitions)
  MapPartitionsRDD[4] at distinct at <console>:12 (1 partitions)
    **ShuffledRDD[3] at distinct at <console>:12 (1 partitions)**
      MapPartitionsRDD[2] at distinct at <console>:12 (1 partitions)
        MappedRDD[1] at distinct at <console>:12 (1 partitions)
          ParallelCollectionRDD[0] at parallelize at <console>:12 (1 partitions)

So as you can see distinct creates a shuffle. It is also particularly important to find out this way rather than docs because there are situations where a shuffle will be required or not required for a certain function. For example join usually requires a shuffle but if you join two RDD's that branch from the same RDD spark can sometimes elide the shuffle.

poiuytrez
  • 21,330
  • 35
  • 113
  • 172
aaronman
  • 18,343
  • 7
  • 63
  • 78
  • 14
    It's just a generally good point about programming actually - try to answer questions by reading code and using code over reading documentation. Documentation can often contain duplication of information, and with duplication comes errors and loss of information. When reading code, one can know *exactly* what is going on :) – samthebest Dec 07 '14 at 10:34
  • 1
    According to the documentation, `toDebugString` returns "A description of this RDD and its recursive dependencies for debugging." So it will include possible shuffles from prior transformations if they exist, even if the most recent transformation does not incur shuffle, right? – CyberPlayerOne Mar 01 '18 at 05:10
25

Here is a list of operations that might cause a shuffle:

cogroup

groupWith

join: hash partition

leftOuterJoin: hash partition

rightOuterJoin: hash partition

groupByKey: hash partition

reduceByKey: hash partition

combineByKey: hash partition

sortByKey: range partition

distinct

intersection: hash partition

repartition

coalesce

Source: Big Data Analysis with Spark and Scala, Optimizing with Partitions, Coursera

ruhong
  • 1,793
  • 5
  • 28
  • 34
5

This might be helpful: https://spark.apache.org/docs/latest/programming-guide.html#shuffle-operations

or this: http://www.slideshare.net/SparkSummit/dev-ops-training, starting with slide 208

from slide 209: "Transformations that use 'numPartitions' like distinct will probably shuffle"

Glenn Strycker
  • 4,816
  • 6
  • 31
  • 51
4

Here is the generalised statement on shuffling transformations.

Transformations which can cause a shuffle include repartition operations like repartition and coalesce, ‘ByKey operations (except for counting) like groupByKey and reduceByKey, and join operations like cogroup and join.

source

mrsrinivas
  • 34,112
  • 13
  • 125
  • 125