I have this simple spark program. I am wondering why all data end up in one partition.
val l = List((30002,30000), (50006,50000), (80006,80000),
(4,0), (60012,60000), (70006,70000),
(40006,40000), (30012,30000), (30000,30000),
(60018,60000), (30020,30000), (20010,20000),
(20014,20000), (90008,90000), (14,0), (90012,90000),
(50010,50000), (100008,100000), (80012,80000),
(20000,20000), (30010,30000), (20012,20000),
(90016,90000), (18,0), (12,0), (70016,70000),
(20,0), (80020,80000), (100016,100000), (70014,70000),
(60002,60000), (40000,40000), (60006,60000),
(80000,80000), (50008,50000), (60008,60000),
(10002,10000), (30014,30000), (70002,70000),
(40010,40000), (100010,100000), (40002,40000),
(20004,20000),
(10018,10000), (50018,50000), (70004,70000),
(90004,90000), (100004,100000), (20016,20000))
val l_rdd = sc.parallelize(l, 2)
// print each item and index of the partition it belongs to
l_rdd.mapPartitionsWithIndex((index, iter) => {
iter.toList.map(x => (index, x)).iterator
}).collect.foreach(println)
// reduce on the second element of the list.
// alternatively you can use aggregateByKey
val l_reduced = l_rdd.map(x => {
(x._2, List(x._1))
}).reduceByKey((a, b) => {b ::: a})
// print the reduced results along with its partition index
l_reduced.mapPartitionsWithIndex((index, iter) => {
iter.toList.map(x => (index, x._1, x._2.size)).iterator
}).collect.foreach(println)
When you run this, you will see that data (l_rdd
) is distributed into two partitions. Once I reduced, the resultant RDD (l_reduced
) also has two partitions but all the data is in one partition (index 0) and the other one is empty. This happens even if the data is huge (a few GBs). Shouldn't the l_reduced
be also distributed into two partitions.