1

To reduce shuffling during the joining of two RDDs, I decided to partition them using HashPartitioner first. Here is how I do it. Am I doing it correctly, or is there a better way to do this?

val rddA = ...
val rddB = ...

val numOfPartitions = rddA.getNumPartitions

val rddApartitioned = rddA.partitionBy(new HashPartitioner(numOfPartitions))
val rddBpartitioned = rddB.partitionBy(new HashPartitioner(numOfPartitions))

val rddAB = rddApartitioned.join(rddBpartitioned)
MetallicPriest
  • 29,191
  • 52
  • 200
  • 356

2 Answers2

5

To reduce shuffling during the joining of two RDDs,

It is surprisingly common misconception that repartitoning reduces or even eliminates shuffles. It doesn't. Repartitioning is shuffle in its purest form. It doesn't save time, bandwidth or memory.

The rationale behind using proactive partitioner is different - it allows you to shuffle once, and reuse the state, to perform multiple by-key operations, without additional shuffles (though as far as I am aware, not necessarily without additional network traffic, as co-partitioning doesn't imply co-location, excluding cases where shuffles occurred in a single actions).

So your code is correct, but in a case where you join once it doesn't buy you anything.

user10938362
  • 3,991
  • 2
  • 12
  • 29
  • Good observation :) In my case, I also do a sortByKey afterwards, so I guess its helping. – MetallicPriest Mar 21 '19 at 13:47
  • 1
    If `sortByKey` is applied on `rddAB`, then it makes no difference at all. If it is applied on `rddApartitioned` / `rddBpartitioned` then it could provide some benefits. – user10938362 Mar 21 '19 at 20:09
  • @user10938362 Does it require both RDDs to be `partitionBy`ed? What happen if I only `partitionBy` for `rddA` and not `rddB`? – jack Aug 09 '20 at 11:44
0

Just one comment, better to append .persist() after .partitionBy if there are multiple actions for rddApartitioned and rddBpartitioned, otherwise, all the actions will evaluate the entire lineage of rddApartitioned and rddBpartitioned, which will cause the hash-partitioning takes place again and again.

val rddApartitioned = rddA.partitionBy(new HashPartitioner(numOfPartitions)).persist()
val rddBpartitioned = rddB.partitionBy(new HashPartitioner(numOfPartitions)).persist()
jack
  • 1,787
  • 14
  • 30