0

Assuming that I am having the following RDD:

alist = [('a',[['1',2]]),('b',[['2',3]]),('b',[['8',5]]),('b',[['8',5]]),('c',[['4',22]]),('a',[['5',22]])]
anRDD = sc.parallelize(alist)

My task is from each string letter get the list with the highest int value (index 1 of the list). If there is a huge amount of data and a lot of different keys (string letters) what of the following methods is recommended?

Method 1:

import operator

def sortAndTake(alistoflists):
    alistoflists.sort(key=operator.itemgetter(1),reverse=True)
    return alistoflists[0]

reducedRDD = anRDD.reduceByKey(lambda a,b:a+b)
finalRDD = reducedRDD.map(lambda x: (x[0],sortAndTake(x[1])))
finalRDD.collect()

Method 2:

def partitioner(n):
    def partitioner_(x):
        return portable_hash(x[0]) % n
    return partitioner_

def sortIterator(iterator):
    oldKey = None
    cnt = 0
    for item in iterator:
        if item[0] != oldKey:
            oldKey = item[0]
            yield item

partitioned = anRDD.keyBy(lambda kv:(kv[0],kv[1][0][1]))

partitioned.repartitionAndSortWithinPartitions(
                                 numPartitions=2,
                                 partitionFunc=partitioner(2),ascending=False)
           .map(lambda x: x[1])
           .mapPartitions(sortIterator)

(method 2 is inspired from the accepted answer(by zero323) from a previous question I have made: Using repartitionAndSortWithinPartitions)

From my understanding in the first method if we got a huge different key values there is a lot of shuffling between the workers in the reduceByKey which could make the method 2 quicker ( I am not sure if the same is happening when using repartitionAndSortWithinPartitions in method2).

Any insight? Thanks :)

Ram Ghadiyaram
  • 28,239
  • 13
  • 95
  • 121
Mpizos Dimitris
  • 4,819
  • 12
  • 58
  • 100

1 Answers1

2

My task is from each string letter get the list with the highest int value(index 1 of the list).

If that's the case both methods are very inefficient. Instead just reduceByKey withmax:

from operator import itemgetter
from functools import partial

anRDD.mapValues(itemgetter(0)).reduceByKey(partial(max, key=itemgetter(1)))

Regarding two proposed methods:

  • Both shuffle the same amount of data.
  • The first one is just less efficient groupByKey.
zero323
  • 322,348
  • 103
  • 959
  • 935