1

I want to even out the partition size of rdds/dataframes in Spark to get rid of straggler tasks that slow my job down. I can do so using repartition(n_partition), which creates partitions of quite uniform size. However, that involves an expensive shuffle.

I know that coalesce(n_desired_partitions) is a cheaper alternative that avoids shuffling, and instead merges partitions on the same executor. However, it's not clear to me whether this function tries to create partitions of roughly uniform size, or simply merges input partitions without regard to their sizes.

For example, let's say that the following we have an Rdd of the integers in the range [1,12] in three partitions as follows: [(1,2,3,4,5,6,7,8),(9,10),(11,12)]. Let's say these are all on the same executor.

Now I call rdd.coalesce(2). Will the algorithm that powers coalesce know to merge the two small partitions (because they're smaller and we want balanced partition sizes), rather than just merging two arbitrary partitions?

Discussion of this topic elsewhere

According to this presentation (skip to 7:27) Netflix big data team needed to implement a custom coalese function to balance partition sizes. See also SPARK-14042.

Why this question's not a duplicate

There is a more general question about the differences between partition and coalesce here, but nobody gets there explains whether the algorithm that powers coalesce tries to balance partition size.

Community
  • 1
  • 1
conradlee
  • 12,985
  • 17
  • 57
  • 93
  • Possible duplicate of [Spark - repartition() vs coalesce()](http://stackoverflow.com/questions/31610971/spark-repartition-vs-coalesce) – Shivansh Dec 08 '16 at 10:09

1 Answers1

0

So actually repartition is nothing its def is look like below

def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
    coalesce(numPartitions, shuffle = true)
}

So its simply coalesce with shuffle but when call coalesce its shuffle will be by default false so it will not shuffle the data till its will not needed.

Example you have 2 cluster node and each have 2 partitions and now u call rdd.coalesce(2) so it will merge the local partitions of the node or if you call the coalesce(1) then it will need the shuffle because other 2 partition will be on another node so may be in your case it will join local node partitions and that node have less number of partitions so ur partition size is not uniform.

ok according to your editing of question i also try to do the same as follows

    val data = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10,11,12))
    data.getNumPartitions
    res2: Int = 4
   data.mapPartitionsWithIndex{case (a,b)=>println("partitionssss"+a);b.map(y=>println("dataaaaaaaaaaaa"+y))}.count

the output of above code will be enter image description here

And now i coalesce the 4 partition to 2 and run the same code on that rdd to check how optimize spark coalesce the data so the output will be

enter image description here

Now you can easily see that the spark equally distribute the data to both the partitions 6-6 even before coalesce it the number of elements are not same in all partitions.

 val coal=data.coalesce(2)
 coal.getNumPartitions
res4: Int = 2
coal.mapPartitionsWithIndex{case (a,b)=>println("partitionssss"+a);b.map(y=>println("dataaaaaaaaaaaa"+y))}.count
Sandeep Purohit
  • 3,652
  • 18
  • 22
  • Sorry dude @shivansh dude :p – Sandeep Purohit Dec 08 '16 at 09:57
  • Thanks for your answer, but I don't feel it answers the question. It's still not clear how `coalesce` behaves when shuffle is set to false. I updated the question with an example that I hope makes it more clear. – conradlee Dec 08 '16 at 11:25
  • @conradlee Thanks..!!! its interesting i also do some POC and edit my answer so once check it out may be now its more clear..!! – Sandeep Purohit Dec 08 '16 at 12:23