I want to group by key some rows in a RDD so I can perform more advanced operations with the rows within one group. Please note, I do not want to calculate merely some aggregate values. The rows are key-value pairs, where the key is a GUID and the value is a complex object.
As per pyspark documentation, I first tried to implement this with combineByKey as it is supposed be more performant than groupByKey. The list at the beginning is just for illustration, not my real data:
l = list(range(1000))
numbers = sc.parallelize(l)
rdd = numbers.map(lambda x: (x % 5, x))
def f_init(initial_value):
return [initial_value]
def f_merge(current_merged, new_value):
if current_merged is None:
current_merged = []
return current_merged.append(new_value)
def f_combine(merged1, merged2):
if merged1 is None:
merged1 = []
if merged2 is None:
merged2 = []
return merged1 + merged2
combined_by_key = rdd.combineByKey(f_init, f_merge, f_combine)
c = combined_by_key.collectAsMap()
i = 0
for k, v in c.items():
if v is None:
print(i, k, 'value is None.')
else:
print(i, k, len(v))
i += 1
The output of this is:
0 0 0
1 1 0
2 2 0
3 3 0
4 4 0
Which is not what I expected. The same logic but implemented with groupByKey returns a correct output:
grouped_by_key = rdd.groupByKey()
d = grouped_by_key.collectAsMap()
i = 0
for k, v in d.items():
if v is None:
print(i, k, 'value is None.')
else:
print(i, k, len(v))
i += 1
Returns:
0 0 200
1 1 200
2 2 200
3 3 200
4 4 200
So unless I'm missing something, this is the case when groupByKey is preferred over reduceByKey or combineByKey (the topic of related discussion: Is groupByKey ever preferred over reduceByKey).