5

I have this simple spark program. I am wondering why all data end up in one partition.

val l = List((30002,30000), (50006,50000), (80006,80000), 
             (4,0), (60012,60000), (70006,70000), 
             (40006,40000), (30012,30000), (30000,30000),
             (60018,60000), (30020,30000), (20010,20000), 
             (20014,20000), (90008,90000), (14,0), (90012,90000),
             (50010,50000), (100008,100000), (80012,80000),
             (20000,20000), (30010,30000), (20012,20000), 
             (90016,90000), (18,0), (12,0), (70016,70000), 
             (20,0), (80020,80000), (100016,100000), (70014,70000),
             (60002,60000), (40000,40000), (60006,60000), 
             (80000,80000), (50008,50000), (60008,60000), 
             (10002,10000), (30014,30000), (70002,70000),
             (40010,40000), (100010,100000), (40002,40000),
             (20004,20000), 
             (10018,10000), (50018,50000), (70004,70000),
             (90004,90000), (100004,100000), (20016,20000))

val l_rdd = sc.parallelize(l, 2)

// print each item and index of the partition it belongs to
l_rdd.mapPartitionsWithIndex((index, iter) => {
   iter.toList.map(x => (index, x)).iterator
}).collect.foreach(println)

// reduce on the second element of the list.
// alternatively you can use aggregateByKey  
val l_reduced = l_rdd.map(x => {
                    (x._2, List(x._1))
                  }).reduceByKey((a, b) => {b ::: a})

// print the reduced results along with its partition index
l_reduced.mapPartitionsWithIndex((index, iter) => {
      iter.toList.map(x => (index, x._1, x._2.size)).iterator
}).collect.foreach(println)

When you run this, you will see that data (l_rdd) is distributed into two partitions. Once I reduced, the resultant RDD (l_reduced) also has two partitions but all the data is in one partition (index 0) and the other one is empty. This happens even if the data is huge (a few GBs). Shouldn't the l_reduced be also distributed into two partitions.

Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
Shirish Kumar
  • 1,532
  • 17
  • 23

2 Answers2

4
val l_reduced = l_rdd.map(x => {
                    (x._2, List(x._1))
                  }).reduceByKey((a, b) => {b ::: a})

With reference to the above snippet, you are partitioning by the second field of the RDD. All the numbers in the second field end with 0.

When you call HashPartitioner, the partition number for a record is decided by the following function:

  def getPartition(key: Any): Int = key match {
    case null => 0
    case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
  }

And the Utils.nonNegativeMod is defined as follows:

 def nonNegativeMod(x: Int, mod: Int): Int = {
    val rawMod = x % mod
    rawMod + (if (rawMod < 0) mod else 0)
  }

Let us see what happens when we apply the above two pieces of logic to your input:

scala> l.map(_._2.hashCode % 2) // numPartitions = 2
res10: List[Int] = List(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)

Therefore, all of your records end up in partition 0.

You can solve this problem by a repartition:

val l_reduced = l_rdd.map(x => {
                    (x._2, List(x._1))
                  }).reduceByKey((a, b) => {b ::: a}).repartition(2)

which gives:

(0,100000,4)
(0,10000,2)
(0,0,5)
(0,20000,6)
(0,60000,5)
(0,80000,4)
(1,50000,4)
(1,30000,6)
(1,90000,4)
(1,70000,5)
(1,40000,4)

Alternatively, you can create a custom partitioner.

Community
  • 1
  • 1
axiom
  • 8,765
  • 3
  • 36
  • 38
0

Unless you specify otherwise, the partitioning will be done based on the hashcode of the keys concerned, with the assumption that the hashcodes will result in a relatively even distribution. In this case, your hashcodes are all even, and therefore will all go into partition 0.

If this is truly representative of your data set, there is an overload for reduceByKey which takes the partitioner as well as the reduce function. I would suggest providing an alternative partitioning algorithm for a dataset like this.

Joe C
  • 15,324
  • 8
  • 38
  • 50