27

As everyone knows partitioners in Spark have a huge performance impact on any "wide" operations, so it's usually customized in operations. I was experimenting with the following code:

val rdd1 =
  sc.parallelize(1 to 50).keyBy(_ % 10)
    .partitionBy(new HashPartitioner(10))
val rdd2 =
  sc.parallelize(200 to 230).keyBy(_ % 13)

val cogrouped = rdd1.cogroup(rdd2)
println("cogrouped: " + cogrouped.partitioner)

val unioned = rdd1.union(rdd2)
println("union: " + unioned.partitioner)

I see that by default cogroup() always yields an RDD with the customized partitioner, but union() doesn't, it will always revert back to default. This is counterintuitive as we usually assume that a PairRDD should use its first element as partition key. Is there a way to "force" Spark to merge 2 PairRDDs to use the same partition key?

Daniel Darabos
  • 26,991
  • 10
  • 102
  • 114
tribbloid
  • 4,026
  • 14
  • 64
  • 103

2 Answers2

50

union is a very efficient operation, because it doesn't move any data around. If rdd1 has 10 partitions and rdd2 has 20 partitions then rdd1.union(rdd2) will have 30 partitions: the partitions of the two RDDs put after each other. This is just a bookkeeping change, there is no shuffle.

But necessarily it discards the partitioner. A partitioner is constructed for a given number of partitions. The resulting RDD has a number of partitions that is different from both rdd1 and rdd2.

After taking the union you can run repartition to shuffle the data and organize it by key.


There is one exception to the above. If rdd1 and rdd2 have the same partitioner (with the same number of partitions), union behaves differently. It will join the partitions of the two RDDs pairwise, giving it the same number of partitions as each of the inputs had. This may involve moving data around (if the partitions were not co-located) but will not involve a shuffle. In this case the partitioner is retained. (The code for this is in PartitionerAwareUnionRDD.scala.)

Daniel Darabos
  • 26,991
  • 10
  • 102
  • 114
  • 5
    There's actually a partitioner-aware union RDD that I think is supposed to be used automatically in cases where partitioning could be preserved; not sure why it's not applied here. See https://github.com/apache/spark/blob/e0628f2fae7f99d096f9dd625876a60d11020d9b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala#L123 and https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala – Josh Rosen Apr 30 '15 at 22:09
  • Wow, cool! Never knew about that. Looks like it's only used when both RDDs have the same partitioner. I'll add that to the answer, thanks! – Daniel Darabos Apr 30 '15 at 22:13
  • Thanks a lot! This is a very important optimization. BTW if this is not optimal for all cases I can always write a zip + in-partition union anyway – tribbloid May 15 '15 at 01:44
  • Excellent answer Daniel. Thank you. – human Sep 04 '17 at 12:14
  • Very interesting! Is there any specific way to ensure that they will have the same partitioner and the same number of partitions(without repartition)? I am performing iterative unions of Dataframes (bigDF.union(oneRowDF) iteratively) with pyspark. – drkostas Jun 07 '18 at 17:02
  • 1
    Almost everything uses a HashPartitioner. So if your DataFrames have the same number of partitions, I would hope that would be enough. You can just print `df.partitioner` and `df.partitions` to see what's happening. – Daniel Darabos Jun 07 '18 at 20:26
  • 1
    Just to add that the correct commands are `df.rdd.partitioner` and `df.rdd.getNumPartitions`. Do have any idea why my DFs don't have any partitioner (None) even when I am repartitioning them? – drkostas Jun 08 '18 at 11:45
  • Ah, sorry, I was completely wrong. This does not apply to DataFrames at all. You need a key for partitioning. DataFrames don't have keys. Also `union` may be completely different for them than it is for RDDs. Sorry for misleading you. – Daniel Darabos Jun 12 '18 at 18:25
  • If your `oneRowDF` really only has one row, maybe you could try just collecting all of them locally and building a more reasonable sized DF from them before going for the union. (I haven't tried.) – Daniel Darabos Jun 12 '18 at 18:26
2

This is no longer true. Iff two RDDs have exactly the same partitioner and number of partitions, the unioned RDD will also have those same partitions. This was introduced in https://github.com/apache/spark/pull/4629 and incorporated into Spark 1.3.

Joel Croteau
  • 1,682
  • 12
  • 17