2

I want to group list of values per key and was doing something like this:

sc.parallelize(Array(("red", "zero"), ("yellow", "one"), ("red", "two"))).groupByKey().collect.foreach(println)

(red,CompactBuffer(zero, two))
(yellow,CompactBuffer(one))

But I noticed a blog post from Databricks and it's recommending not to use groupByKey for large dataset.

Avoid GroupByKey

Is there a way to achieve the same result using reduceByKey?

I tried this but it's concatenating all values. By the way, for my case, both key and value are string type.

sc.parallelize(Array(("red", "zero"), ("yellow", "one"), ("red", "two"))).reduceByKey(_ ++ _).collect.foreach(println)

(red,zerotwo)
(yellow,one)
sikara tijuhara
  • 133
  • 1
  • 2
  • 10

2 Answers2

2

Use aggregateByKey:

 sc.parallelize(Array(("red", "zero"), ("yellow", "one"), ("red", "two")))
.aggregateByKey(ListBuffer.empty[String])(
        (numList, num) => {numList += num; numList},
         (numList1, numList2) => {numList1.appendAll(numList2); numList1})
.mapValues(_.toList)
.collect()

scala> Array[(String, List[String])] = Array((yellow,List(one)), (red,List(zero, two)))

See this answer for the details on aggregateByKey, this link for the rationale behind using a mutable dataset ListBuffer.

EDIT:

Is there a way to achieve the same result using reduceByKey?

The above is actually worse in performance, please see comments by @zero323 for the details.

Community
  • 1
  • 1
axiom
  • 8,765
  • 3
  • 36
  • 38
  • Thanks @axiom, appreciate it! – sikara tijuhara Jun 01 '16 at 23:38
  • It doesn't provide any performance improvement over `groupByKey`. It is actually significantly less efficient due to higher pressure on GC. – zero323 Jun 02 '16 at 08:16
  • @zero323 Have added an edit, but wouldn't that be a function of key distribution? Also, is the Iterable provided by groupByKey similar in nature to the one available in a reducer in the MR world (abstraction over a stream of elements, elements may not be in memory)?. – axiom Jun 02 '16 at 08:21
  • 1
    OK, so there are a few problems here. For any (key, values) pair all elements have to fit into memory. This is true both in case of your solution and groupByKey. Unlike groupByKey, your solution applies map side combine which doesn't reduce traffic but requires creation of a local hashmap to store pairs. Spark explicitly avoids that in groupByKey (see JIRA linked by Climbage in http://stackoverflow.com/q/33221713/1560062) and source https://github.com/apache/spark/blob/d5911d1173fe0872f21cae6c47abf8ff479345a4/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L501-L503 – zero323 Jun 02 '16 at 08:29
  • So given current implementation groupByKey is the best thing you can get if you really need groups by key. There are other approaches (external sorting for example) but these won't result in the same structure. – zero323 Jun 02 '16 at 08:31
  • I see, thanks. Seems like this answer is misleading and should go, please feel free to add an updated answer. – axiom Jun 02 '16 at 08:36
  • @zero323 or let me do the extra work; I'll summarize the links you have shared and update the answer. Really appreciate the pointers. – axiom Jun 02 '16 at 08:42
  • I think clarification is enough :) – zero323 Jun 02 '16 at 10:33
2
sc.parallelize(Array(("red", "zero"), ("yellow", "one"), ("red", "two")))
.map(t => (t._1,List(t._2)))
.reduceByKey(_:::_)
.collect()
Array[(String, List[String])] = Array((red,List(zero, two)), (yellow,List(one)))