0

i am trying to group elements of an RDD that i have created. one simple but expensive way is to use GroupByKey(). but recently i learned that CombineByKey() can do this work more efficiently. my RDD is very simple. it looks like this:

(1,5)
(1,8)
(1,40)
(2,9)
(2,20)
(2,6)
val grouped_elements=first_RDD.groupByKey()..mapValues(x => x.toList)

the result is:

(1,List(5,8,40))
(2,List(9,20,6))

i want to group them based on the first element (key).

can any one help me to do it with CombineByKey() function? i am really confused by CombineByKey()

thebluephantom
  • 16,458
  • 8
  • 40
  • 83
  • 3
    Does this answer your question? [Apache Spark: What is the equivalent implementation of RDD.groupByKey() using RDD.aggregateByKey()?](https://stackoverflow.com/questions/31081563/apache-spark-what-is-the-equivalent-implementation-of-rdd-groupbykey-using-rd) – user10938362 Mar 26 '20 at 16:35
  • it is some thing near to my question. but not exactly what i want. but thanks for your attention –  Mar 26 '20 at 19:04

1 Answers1

1

To begin with take a look at API Refer docs

combineByKey[C](createCombiner: (V) ⇒ C, mergeValue: (C, V) ⇒ C, mergeCombiners: (C, C) ⇒ C): RDD[(K, C)]

So it accepts three functions which I have defined below

scala> val createCombiner = (v:Int) => List(v)
createCombiner: Int => List[Int] = <function1>

scala> val mergeValue = (a:List[Int], b:Int) => a.::(b)
mergeValue: (List[Int], Int) => List[Int] = <function2>

scala> val mergeCombiners = (a:List[Int],b:List[Int]) => a.++(b)
mergeCombiners: (List[Int], List[Int]) => List[Int] = <function2>

Once you define these then you can use it in your combineByKey call as below

scala> val list = List((1,5),(1,8),(1,40),(2,9),(2,20),(2,6))
list: List[(Int, Int)] = List((1,5), (1,8), (1,40), (2,9), (2,20), (2,6))

scala> val temp = sc.parallelize(list)
temp: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[41] at parallelize at <console>:30

scala> temp.combineByKey(createCombiner,mergeValue, mergeCombiners).collect
res27: Array[(Int, List[Int])] = Array((1,List(8, 40, 5)), (2,List(20, 9, 6)))

Please note that I tried this out in Spark Shell and hence you can see the outputs below the commands executed. They will help build you your understanding.

Amit
  • 1,111
  • 1
  • 8
  • 14