20

I always use reduceByKey when I need to group data in RDDs, because it performs a map side reduce before shuffling data, which often means that less data gets shuffled around and I thus get better performance. Even when the map side reduce function collects all values and does not actually reduce the data amount, I still use reduceByKey, because I'm assuming that the performance of reduceByKey will never be worse than groupByKey. However, I'm wondering if this assumption is correct or if there are indeed situations where groupByKey should be preferred??

Community
  • 1
  • 1
Glennie Helles Sindholt
  • 12,816
  • 5
  • 44
  • 50
  • From the answers I have gotten below (and thank you for those), @eliasah says that `groupByKey` is just syntax sugar whereas @climbage thinks that `reduceByKey` might be slightly slower if I use it to replicate `groupByKey` functionality. I think I will actually try to test the two functions on some examples :) – Glennie Helles Sindholt Oct 19 '15 at 20:19
  • http://stackoverflow.com/questions/30825936/when-should-groupbykey-api-used-in-spark-programming – Knight71 Oct 20 '15 at 06:03
  • The only time I've needed to use groupByKey is for calculations on data samples that depend on the previous value. A precomputed running total, would be one example. GPS distance. Etc. – pestilence669 Jul 10 '16 at 05:34

3 Answers3

18

I believe there are other aspects of the problem ignored by climbage and eliasah:

  • code readability
  • code maintainability
  • codebase size

If operation doesn't reduce amount of data it has to be one way or another semantically equivalent to GroupByKey. Lets assume we haveRDD[(Int,String)]:

import scala.util.Random
Random.setSeed(1)

def randomString = Random.alphanumeric.take(Random.nextInt(10)).mkString("")

val rdd = sc.parallelize((1 to 20).map(_ => (Random.nextInt(5), randomString)))

and we want to concatenate all strings for a given key. With groupByKey it is pretty simple:

rdd.groupByKey.mapValues(_.mkString(""))

Naive solution with reduceByKey looks like this:

rdd.reduceByKey(_ + _)

It is short and arguably easy to understand but suffers from two issues:

  • is extremely inefficient since it creates a new String object every time*
  • suggests that operation you perform is less expensive than it is in reality, especially if you analyze only DAG or debug string

To deal with the first problem we need a mutable data structure:

import scala.collection.mutable.StringBuilder

rdd.combineByKey[StringBuilder](
    (s: String) => new StringBuilder(s),
    (sb: StringBuilder, s: String) => sb ++= s,
    (sb1: StringBuilder, sb2: StringBuilder) => sb1.append(sb2)
).mapValues(_.toString)

It still suggests something else that is really going on and is quite verbose especially if repeated multiple times in your script. You can of course extract anonymous functions

val createStringCombiner = (s: String) => new StringBuilder(s)
val mergeStringValue = (sb: StringBuilder, s: String) => sb ++= s
val mergeStringCombiners = (sb1: StringBuilder, sb2: StringBuilder) => 
  sb1.append(sb2)

rdd.combineByKey(createStringCombiner, mergeStringValue, mergeStringCombiners)

but at the end of the day it still means additional effort to understand this code, increased complexity and no real added value. One thing I find particularly troubling is explicit inclusion of mutable data structures. Even if Spark handles almost all complexity it means we no longer have an elegant, referentially transparent code.

My point is if you really reduce amount of data by all means use reduceByKey. Otherwise you make your code harder to write, harder to analyze and gain nothing in return.

Note:

This answer is focused on Scala RDD API. Current Python implementation is quite different from its JVM counterpart and includes optimizations which provide significant advantage over naive reduceByKey implementation in case of groupBy-like operations.

For Dataset API see DataFrame / Dataset groupBy behaviour/optimization.


* See Spark performance for Scala vs Python for a convincing example

Community
  • 1
  • 1
zero323
  • 322,348
  • 103
  • 959
  • 935
7

reduceByKey and groupByKey both use combineByKey with different combine/merge semantics.

They key difference I see is that groupByKey passes the flag (mapSideCombine=false) to the shuffle engine. Judging by the issue SPARK-772, this is a hint to the shuffle engine to not run the mapside combiner when the data size isn't going to change.

So I would say that if you are trying to use reduceByKey to replicate groupByKey, you might see a slight performance hit.

Mike Park
  • 10,845
  • 2
  • 34
  • 50
3

I'll not invent the wheel, according to the code documentation, the groupByKey operation groups the values for each key in the RDD into a single sequence which also allows controlling the partitioning of the resulting key-value pair RDD by passing a Partitioner.

This operation may be very expensive. If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using aggregateByKey or reduceByKey will provide much better performance.

Note: As currently implemented, groupByKey must be able to hold all the key-value pairs for any key in memory. If a key has too many values, it can result in an OOME.

As a matter of fact, I prefer the combineByKey operation, but it's sometime hard to understand the concept of the combiner and the merger if you are not very familiar with the map-reduce paradigm. For this, you can read the yahoo map-reduce bible here, which explains well this topic.

For more information, I advice you to read the PairRDDFunctions code.

eliasah
  • 39,588
  • 11
  • 124
  • 154
  • I understand the possible problems associated with `groupByKey` (such as too many values for a given key) - the question was if there are times when `groupByKey` is actually the better choice. You mention that the partitioning of the resulting key-value pair can be controlled when using `groupByKey`, but it can also be controlled with `reduceByKey`, so that doesn't seem to be a reason for using `groupByKey`, or am I misunderstanding you? – Glennie Helles Sindholt Oct 19 '15 at 20:00
  • 1
    That's completely correct, you can consider `groupByKey` as a syntax sugar. If you can avoid it it's always better to use aggregateByKey,reduceByKey, or combineByKey – eliasah Oct 19 '15 at 20:03
  • @GlennieHellesSindholt you don't seem to be convinced. – eliasah Oct 19 '15 at 21:42
  • How could `combineByKey` avoid the OOM issue? The results are the same in size. – shuaiyuancn Jun 24 '16 at 14:52
  • @shuaiyuancn I'm not sure to understand your question without context l – eliasah Jun 24 '16 at 15:05
  • @eliasah You said "As a matter of fact, I prefer the combineByKey operation" after saying there could be OOM issue with `groupByKey`. `combineByKey` should have the same issue in this case (and it complicates the logic) – shuaiyuancn Jun 24 '16 at 15:43
  • CombineByKey allows monodic operations to be performed which is not the case of GroupByKey. So actually, no, it shouldn't have quite much the same issues. – eliasah Jun 24 '16 at 15:46
  • OP said "... does not actually reduce the data amount" so yes, they are the same if `combineByKey` is not more expensive. – shuaiyuancn Jun 27 '16 at 13:29
  • 1
    @shuaiyuancn While using `combineByKey` with `CompactBuffer`, `+=` and `++=` is exactly equivalent to `groupByKey`, `combineByKey` allows you to choose more efficient data structure based on a data distribution. Arguably there are only a handful of cases when these cannot be replaced by repartitioning and external sorting but it is most likely to low level approach for a typical user. – zero323 Jun 27 '16 at 19:19