0

My RDD is made of many items, each of which is a tuple as follows:

(key1, (val1_key1, val2_key1))
(key2, (val1_key2, val2_key2))
(key1, (val1_again_key1, val2_again_key1))
... and so on

I used GroupByKey on the RDD which gave the result as

(key1, [(val1_key1, val2_key1), (val1_again_key1, val2_again_key1), (), ... ()])
(key2, [(val1_key2, val2_key2), (), () ... ())])
... and so on

I need to do the same using reduceByKey. I tried doing

RDD.reduceByKey(lambda val1, val2: list(val1).append(val2))

but it doesn't work.

Please suggest the right way to implement using reduceByKey()

zero323
  • 322,348
  • 103
  • 959
  • 935
London guy
  • 27,522
  • 44
  • 121
  • 179
  • This should work...but it starts as a list, so I think you just need to remove the list wrapper....`val1.append(val2)` – Justin Pihony Dec 15 '15 at 15:42
  • @JustinPihony I just tried this but it throws "AttributeError: 'tuple' object has no attribute 'append'" may be because the first element is starts to work with is not a list – London guy Dec 15 '15 at 15:45
  • 1
    a) You cannot (in a non-hacky-way) do __the same__ because types on LHS and RHS don't match. b) It is significantly less efficient because it __doesn't reduce anything__. See http://stackoverflow.com/q/33221713/1560062 – zero323 Dec 15 '15 at 15:49
  • Ahh, I don't know my python types well enough...I thought you already had a list – Justin Pihony Dec 15 '15 at 16:43

1 Answers1

2

The answer is you cannot (or at least not in a straightforward and Pythonic way without abusing language dynamism). Since values type and return type are different (a list of tuples vs a single tuple) reduce is not a valid function here. You could use combineByKey or aggregateByKey for example like this:

rdd = sc.parallelize([
    ("key1", ("val1_key1", "val2_key1")),
    ("key2", ("val1_key2", "val2_key2"))])

rdd.aggregateByKey([], lambda acc, x: acc + [x], lambda acc1, acc2: acc1 + acc2)

but it is just a less efficient version of groupByKey. See also Is groupByKey ever preferred over reduceByKey

Community
  • 1
  • 1
zero323
  • 322,348
  • 103
  • 959
  • 935
  • Wouldn't using aggregateByKey in this case prevent the keys from being shuffled across the network? Meaning that the function would run for whatever keys it has on whatever node it is on without worrying about the existence of keys on other nodes. – Jared Feb 11 '16 at 21:01
  • @Salmonerd Actually, I've looked at the PySpark code and it doesn't disable map side aggregation yet. So this is (with some small differences) equivalent to `groupByKey`. In general there is another problem. If number of unique keys per partition is low it can be beneficial to perform map-side aggregation. But in general is expensive since it has to create large amount of temporary objects. This is a slow process and these objects have to be garbage collected later. – zero323 Feb 11 '16 at 22:32
  • Thanks for the additional clarification. Do you know if using the Scala version would have any performance gains over pySpark or would offer a better solution? – Jared Feb 11 '16 at 23:04
  • I would expect anything spectacular. While it can be slightly faster most of the job is probably serialization, transfer and serialization. – zero323 Feb 11 '16 at 23:08