Assuming that I am having the following RDD:
alist = [('a',[['1',2]]),('b',[['2',3]]),('b',[['8',5]]),('b',[['8',5]]),('c',[['4',22]]),('a',[['5',22]])]
anRDD = sc.parallelize(alist)
My task is from each string letter get the list with the highest int value (index 1 of the list). If there is a huge amount of data and a lot of different keys (string letters) what of the following methods is recommended?
Method 1:
import operator
def sortAndTake(alistoflists):
alistoflists.sort(key=operator.itemgetter(1),reverse=True)
return alistoflists[0]
reducedRDD = anRDD.reduceByKey(lambda a,b:a+b)
finalRDD = reducedRDD.map(lambda x: (x[0],sortAndTake(x[1])))
finalRDD.collect()
Method 2:
def partitioner(n):
def partitioner_(x):
return portable_hash(x[0]) % n
return partitioner_
def sortIterator(iterator):
oldKey = None
cnt = 0
for item in iterator:
if item[0] != oldKey:
oldKey = item[0]
yield item
partitioned = anRDD.keyBy(lambda kv:(kv[0],kv[1][0][1]))
partitioned.repartitionAndSortWithinPartitions(
numPartitions=2,
partitionFunc=partitioner(2),ascending=False)
.map(lambda x: x[1])
.mapPartitions(sortIterator)
(method 2 is inspired from the accepted answer(by zero323) from a previous question I have made: Using repartitionAndSortWithinPartitions
)
From my understanding in the first method if we got a huge different key values there is a lot of shuffling between the workers in the reduceByKey
which could make the method 2 quicker ( I am not sure if the same is happening when using repartitionAndSortWithinPartitions
in method2).
Any insight? Thanks :)