24

I have a list of Tuples of type : (user id, name, count).

For example,

val x = sc.parallelize(List(
    ("a", "b", 1),
    ("a", "b", 1),
    ("c", "b", 1),
    ("a", "d", 1))
)

I'm attempting to reduce this collection to a type where each element name is counted.

So in above val x is converted to :

(a,ArrayBuffer((d,1), (b,2)))
(c,ArrayBuffer((b,1)))

Here is the code I am currently using :

val byKey = x.map({case (id,uri,count) => (id,uri)->count})

val grouped = byKey.groupByKey
val count = grouped.map{case ((id,uri),count) => ((id),(uri,count.sum))}
val grouped2: org.apache.spark.rdd.RDD[(String, Seq[(String, Int)])] = count.groupByKey

grouped2.foreach(println)

I'm attempting to use reduceByKey as it performs faster than groupByKey.

How can reduceByKey be implemented instead of above code to provide the same mapping ?

Seonghyeon Cho
  • 171
  • 1
  • 3
  • 11
blue-sky
  • 51,962
  • 152
  • 427
  • 752

3 Answers3

31

Following your code:

val byKey = x.map({case (id,uri,count) => (id,uri)->count})

You could do:

val reducedByKey = byKey.reduceByKey(_ + _)

scala> reducedByKey.collect.foreach(println)
((a,d),1)
((a,b),2)
((c,b),1)

PairRDDFunctions[K,V].reduceByKey takes an associative reduce function that can be applied to the to type V of the RDD[(K,V)]. In other words, you need a function f[V](e1:V, e2:V) : V . In this particular case with sum on Ints: (x:Int, y:Int) => x+y or _ + _ in short underscore notation.

For the record: reduceByKey performs better than groupByKey because it attemps to apply the reduce function locally before the shuffle/reduce phase. groupByKey will force a shuffle of all elements before grouping.

maasg
  • 37,100
  • 11
  • 88
  • 115
  • 1
    So, basically, reduceByKey has the same result as doing a groupBy and then applying a custom reduce function? – Savvas Parastatidis Dec 20 '15 at 15:52
  • 5
    @Savvas final result is equal, but `reduceByKey` has a O(1) memory requirement per executor while `groupByKey` needs to keep all grouped values in memory which could lead to OOM. – maasg Dec 22 '15 at 23:19
6

Your origin data structure is: RDD[(String, String, Int)], and reduceByKey can only be used if data structure is RDD[(K, V)].

val kv = x.map(e => e._1 -> e._2 -> e._3) // kv is RDD[((String, String), Int)]
val reduced = kv.reduceByKey(_ + _)       // reduced is RDD[((String, String), Int)]
val kv2 = reduced.map(e => e._1._1 -> (e._1._2 -> e._2)) // kv2 is RDD[(String, (String, Int))]
val grouped = kv2.groupByKey()            // grouped is RDD[(String, Iterable[(String, Int)])]
grouped.foreach(println)
cloud
  • 1,057
  • 7
  • 12
  • There's no such restriction that `V` must be numeric. The only requirement is that the function f(V,V)=>V must be associative. You'll get inconsistent results if it's not. – maasg Jun 06 '14 at 08:47
  • That's a mistake...I was thinking about (_ + _) at that momeent :P, updated. – cloud Jun 06 '14 at 08:59
0

The syntax is below:

reduceByKey(func: Function2[V, V, V]): JavaPairRDD[K, V],

which says for the same key in an RDD it takes the values (which will be definitely of same type) performs the operation provided as part of function and returns the value of same type as of parent RDD.

Prasad Khode
  • 6,602
  • 11
  • 44
  • 59
napster
  • 193
  • 2
  • 13