4
from pyspark import SparkContext

sc = SparkContext()

rdd1 = sc.parallelize([('a', 1), ('b', 2), ('c', 3), ('d', 4)], numSlices=8)
rdd2 = rdd1.mapValues(lambda x: x)

These RDDs have the same partitioning:

rdd1.keys().glom().collect()
>>> [[], ['a'], [], ['b'], [], ['c'], [], ['d']]

rdd2.keys().glom().collect()
>>> [[], ['a'], [], ['b'], [], ['c'], [], ['d']]

There's multiple answers here on SO that suggest that joining co-partitioned data will not cause a shuffle, which makes a lot of sense to me. Example: Does a join of co-partitioned RDDs cause a shuffle in Apache Spark?

However, when I join these co-partitioned RDDs using PySpark, the data is shuffled into a new partition:

rdd1.join(rdd2).keys().glom().collect()
>>> [['a'], [], ['c'], ['b'], [], ['d'], [], [], [], [], [], [], [], [], [], []]

And the partitioning changes even when I set the number of new partitions to the original 8:

rdd1.join(rdd2, numPartitions=8).keys().glom().collect()
>>> [['a'], [], ['c'], ['b'], [], ['d'], [], []]

How come I can't avoid a shuffle using these co-partitioned RDDs?

I'm using Spark 1.6.0.

Community
  • 1
  • 1
Def_Os
  • 5,301
  • 5
  • 34
  • 63

1 Answers1

6

In this case neither rdd1 nor rdd2 is partitioned

rdd1 = sc.parallelize([('a', 1), ('b', 2), ('c', 3), ('d', 4)])
rdd2 = rdd1.mapValues(lambda x: x)

rdd1.partitioner is None
## True

rdd2.partitioner is None
# True

so by definition there are not co-partitioned. While you could partition data and join:

n = rdd1.getNumPartitions()
rdd1part = rdd1.partitionBy(n)
rdd2part = rdd2.partitionBy(n)

rdd1part.join(rdd2part)  # rdd1part and rdd2part are co-partitioned

this would simply rearrange DAG and won't prevent shuffle.

See also Default Partitioning Scheme in Spark

Community
  • 1
  • 1
zero323
  • 322,348
  • 103
  • 959
  • 935
  • Thanks, this might just solve the problem I'm facing. I'm surprised, though, that settings `numSlices` does not imply an explicit partitioning. (I guess now I understand why it's not called `numPartitions`, like in other functions.) – Def_Os Jul 24 '16 at 19:22
  • 3
    When we talk about partitions in Spark we mean two different concepts. You can check my answer to http://stackoverflow.com/q/34491219/1560062 for some explanation. – zero323 Jul 24 '16 at 19:25