2

I have an RDD of (String,String,Int).

  1. I want to reduce it based on the first two strings
  2. And Then based on the first String I want to group the (String,Int) and sort them
  3. After sorting I need to group them into small groups each containing n elements.

I have done the code below. The problem is the number of elements in the step 2 is very large for a single key and the reduceByKey(x++y) takes a lot of time.

//Input
val data = Array(
  ("c1","a1",1), ("c1","b1",1), ("c2","a1",1),("c1","a2",1), ("c1","b2",1), 
  ("c2","a2",1), ("c1","a1",1), ("c1","b1",1), ("c2","a1",1))

val rdd = sc.parallelize(data)
val r1 = rdd.map(x => ((x._1, x._2), (x._3)))
val r2 = r1.reduceByKey((x, y) => x + y ).map(x => ((x._1._1), (x._1._2, x._2)))

// This is taking long time.
val r3 = r2.mapValues(x => ArrayBuffer(x)).reduceByKey((x, y) => x ++ y) 

// from the list I will be doing grouping.
val r4 = r3.map(x => (x._1 , x._2.toList.sorted.grouped(2).toList)) 

Problem is the "c1" has lot of unique entries like b1 ,b2....million and reduceByKey is killing time because all the values are going to single node. Is there a way to achieve this more efficiently?

// output
 Array((c1,List(List((a1,2), (a2,1)), List((b1,2), (b2,1)))), (c2,List(List((a1,2), (a2,1)))))
Community
  • 1
  • 1
Knight71
  • 2,927
  • 5
  • 37
  • 63
  • You can try repartitioning data after the first reduceByKey, then use combineByKey instead map-mapValues-reduceByKey, I think it could help to balance the work load – rhernando Jan 29 '16 at 12:15

1 Answers1

3

There at least few problems with a way you group your data. The first problem is introduced by

 mapValues(x => ArrayBuffer(x))

It creates a large amount of mutable objects which provide no additional value since you cannot leverage their mutability in the subsequent reduceByKey

reduceByKey((x, y) => x ++ y) 

where each ++ creates a new collection and neither argument can be safely mutated. Since reduceByKey applies map side aggregation situation is even worse and pretty much creates GC hell.

Is there a way to achieve this more efficiently?

Unless you have some deeper knowledge about data distribution which can be used to define smarter partitioner the simplest improvement is to replace mapValues + reduceByKey with simple groupByKey:

val r3 = r2.groupByKey

It should be also possible to use a custom partitioner for both reduceByKey calls and mapPartitions with preservesPartitioning instead of map.

class FirsElementPartitioner(partitions: Int)
    extends org.apache.spark.Partitioner {
  def numPartitions  = partitions
  def getPartition(key: Any): Int = {
    key.asInstanceOf[(Any, Any)]._1.## % numPartitions
  }
}

val r2 = r1
  .reduceByKey(new FirsElementPartitioner(8), (x, y) => x + y)
  .mapPartitions(iter => iter.map(x => ((x._1._1), (x._1._2, x._2))), true)

// No shuffle required here.
val r3 = r2.groupByKey

It requires only a single shuffle and groupByKey is simply a local operations:

r3.toDebugString
// (8) MapPartitionsRDD[41] at groupByKey at <console>:37 []
//  |  MapPartitionsRDD[40] at mapPartitions at <console>:35 []
//  |  ShuffledRDD[39] at reduceByKey at <console>:34 []
//  +-(8) MapPartitionsRDD[1] at map at <console>:28 []
//     |  ParallelCollectionRDD[0] at parallelize at <console>:26 []
zero323
  • 322,348
  • 103
  • 959
  • 935
  • Say if I have a custom partition . After partition , a single partition might contain large data. Will this partition be residing in single node or mulitple nodes ? – Knight71 Jan 29 '16 at 12:41
  • 1
    Partition resides always on a single node. Unless you want to play with some workarounds like custom RDD with surrogate keys there is no workaround. Technically the only practical problem should large amount of values for a single key. Everything else can be spilled to disk if required. – zero323 Jan 29 '16 at 12:53
  • Thanks for the custom partitioning . I was able to create mulitple partitions with my data and parallelized the task efficiently – Knight71 Feb 01 '16 at 09:25