0

I want to group by key some rows in a RDD so I can perform more advanced operations with the rows within one group. Please note, I do not want to calculate merely some aggregate values. The rows are key-value pairs, where the key is a GUID and the value is a complex object.

As per pyspark documentation, I first tried to implement this with combineByKey as it is supposed be more performant than groupByKey. The list at the beginning is just for illustration, not my real data:

l = list(range(1000))
numbers = sc.parallelize(l)
rdd = numbers.map(lambda x: (x % 5, x))

def f_init(initial_value):
    return [initial_value]

def f_merge(current_merged, new_value):
    if current_merged is None:
        current_merged = []
    return current_merged.append(new_value)

def f_combine(merged1, merged2):
    if merged1 is None:
        merged1 = []
    if merged2 is None:
        merged2 = []
    return merged1 + merged2

combined_by_key = rdd.combineByKey(f_init, f_merge, f_combine)

c = combined_by_key.collectAsMap()
i = 0
for k, v in c.items():
    if v is None:
        print(i, k, 'value is None.')
    else:
        print(i, k, len(v))
    i += 1

The output of this is:

0 0 0
1 1 0
2 2 0
3 3 0
4 4 0

Which is not what I expected. The same logic but implemented with groupByKey returns a correct output:

grouped_by_key = rdd.groupByKey()
d = grouped_by_key.collectAsMap()
i = 0
for k, v in d.items():
    if v is None:
        print(i, k, 'value is None.')
    else:
        print(i, k, len(v))
    i += 1

Returns:

0 0 200
1 1 200
2 2 200
3 3 200
4 4 200

So unless I'm missing something, this is the case when groupByKey is preferred over reduceByKey or combineByKey (the topic of related discussion: Is groupByKey ever preferred over reduceByKey).

Community
  • 1
  • 1
Eftim
  • 151
  • 1
  • 3

1 Answers1

0

It is the case when understanding basic APIs is preferred. In particular if you check list.append docstring:

?list.append
## Docstring: L.append(object) -> None -- append object to end
## Type:      method_descriptor

you'll see that like the other mutating methods in Python API it by convention doesn't return modified object. It means that f_merge always returns None and there is no accumulation whatsoever.

That being said for most of the problems there much more efficient solutions than groupByKey but rewriting it with combineByKey (or aggregateByKey) is never one of these.

zero323
  • 322,348
  • 103
  • 959
  • 935
  • Thanks. Now that the issue is resolved, I tested the performance of both alternatives and the one with combineByKey is somewhat faster (on a larger dataset it should be more dramatic difference). I guess the reason is because combineByKey and groupByKey work differently and there is no guarantee that groupByKey will make that optimization to use data locality during the merges: http://codingjunkie.net/spark-combine-by-key/. – Eftim Jun 19 '16 at 12:58
  • There are two problems here. a) Scala specifically disables map side aggregation (I reckon this what you mean by _optimization to use data locality_) to improve performance by reducing GC time b) Python implementation is not equivalent to the one provided by Scala and is actually implemented via combineByKey. – zero323 Jun 19 '16 at 17:17