11

The Apache Spark pyspark.RDD API docs mention that groupByKey() is inefficient. Instead, it is recommend to use reduceByKey(), aggregateByKey(), combineByKey(), or foldByKey() instead. This will result in doing some of the aggregation in the workers prior to the shuffle, thus reducing shuffling of data across workers.

Given the following data set and groupByKey() expression, what is an equivalent and efficient implementation (reduced cross-worker data shuffling) that does not utilize groupByKey(), but delivers the same result?

dataset = [("a", 7), ("b", 3), ("a", 8)]
rdd = (sc.parallelize(dataset)
       .groupByKey())
print sorted(rdd.mapValues(list).collect())

Output:

[('a', [7, 8]), ('b', [3])]
Alberto Bonsanto
  • 17,556
  • 10
  • 64
  • 93
jsears
  • 4,511
  • 2
  • 31
  • 36
  • Is your data partitioned randomly, or by key? If you can ensure that all records with a._1 = "a" are on the same partition, you can speed up things dramatically -- you may be able to get away without needing any shuffles, other than the ones needed for the initial partitioning. Maybe try using a hash partitioner? – Glenn Strycker Jun 29 '15 at 22:25

2 Answers2

18

As far as I can tell there is nothing to gain* in this particular case by using aggregateByKey or a similar function. Since you're building a list there is no "real" reduction and amount of data which has to be shuffled is more or less the same.

To really observe some performance gain you need transformations which actually reduces amount of the transfered data for example counting, computing summary statistics, finding unique elements.

Regarding differences benefits of using reduceByKey(), combineByKey(), or foldByKey() there is an important conceptual difference which is easier to see when you consider Scala API singatures.

Both reduceByKey and foldByKey map from RDD[(K, V)] to RDD[(K, V)] while the second one provides additional zero element.

reduceByKey(func: (V, V) ⇒ V): RDD[(K, V)] 
foldByKey(zeroValue: V)(func: (V, V) ⇒ V): RDD[(K, V)]

combineByKey (there is no aggregateByKey, but it is the same type of transformation) transforms from RDD[(K, V)] to RDD[(K, C)]:

combineByKey[C](
   createCombiner: (V) ⇒ C,
   mergeValue: (C, V) ⇒ C,
   mergeCombiners: (C, C) ⇒ C): RDD[(K, C)] 

Going back to your example only combineByKey (and in PySpark aggregateByKey) is really applicable since you are transforming from RDD[(String, Int)] to RDD[(String, List[Int])].

While in a dynamic language like Python it is actually possible to perform such an operation using foldByKey or reduceByKey it makes semantics of the code unclear and to cite @tim-peters "There should be one-- and preferably only one --obvious way to do it" [1].

Difference between aggregateByKey and combineByKey is pretty much the same as between reduceByKey and foldByKey so for a list it is mostly a matter of taste:

def merge_value(acc, x):
    acc.append(x)
    return acc

def merge_combiners(acc1, acc2):
    acc1.extend(acc2)
    return acc1

rdd = (sc.parallelize([("a", 7), ("b", 3), ("a", 8)])
   .combineByKey(
       lambda x: [x],
       lambda u, v: u + [v],
       lambda u1,u2: u1+u2))

In practice you should prefer groupByKey though. PySpark implementation is significantly more optimized compared to naive implementation like the one provided above.

1.Peters, T. PEP 20 -- The Zen of Python. (2004). at https://www.python.org/dev/peps/pep-0020/


* In practice there is actually quite a lot to loose here, especially when using PySpark. Python implementation of groupByKey is significantly more optimized than naive combine by key. You can check Be Smart About groupByKey, created by me and @eliasah for an additional discussion.

Community
  • 1
  • 1
zero323
  • 322,348
  • 103
  • 959
  • 935
  • If you use a partitioner (say, partition by a hash of the key), can you get away without needing any other shuffles? – Glenn Strycker Jun 29 '15 at 22:26
  • @GlennStrycker As far as I know the answer is positive. If RDD is partitioned by key then all values for a given key should be processed locally on a single node. Possible problem is a skewed distribution of keys though. – zero323 Jun 29 '15 at 22:40
3

Here is one option that uses aggregateByKey(). I'd be curious to hear how this can be done using reduceByKey(), combineByKey(), or foldByKey(), and what cost/benefit there is to each alternative.

rdd = (sc.parallelize([("a", 7), ("b", 3), ("a", 8)])
       .aggregateByKey(list(),
                       lambda u,v: u+[v],
                       lambda u1,u2: u1+u2))
print sorted(rdd.mapValues(list).collect())

Output:

[('a', [7, 8]), ('b', [3])]

The following is a slightly more memory efficient implementation, though less readable to the python novice, which produces the same output:

rdd = (sc.parallelize([("a", 7), ("b", 3), ("a", 8)])
       .aggregateByKey(list(),
                       lambda u,v: itertools.chain(u,[v]),
                       lambda u1,u2: itertools.chain(u1,u2)))
print sorted(rdd.mapValues(list).collect())
jsears
  • 4,511
  • 2
  • 31
  • 36