1

I have a rdd like this

// Structure List[Tuple(x1, x2, value), Tuple(x1, x2, value)]
data = [('23', '98', 34), ('23', '89', 39), ('23', '12', 30), ('24', '12', 34), ('24', '14', 37), ('24', '16', 30)]

I am looking for the end result to be max value of score for x1 with x2 value associated with it. Like this

data = [('23', '89', 39), ('24', '14', 37)]

I tried reduceByKey but it is giving me max of each combinations which is not what I am looking for.

From comment:

This is what I tried:

max_by_group = (
    data.map(lambda x: (x[0], x))
        .reduceByKey(lambda x1, x2: max(x1, x2, key=lambda x: x[-1])) 
        .values()
)
pault
  • 41,343
  • 15
  • 107
  • 149
nicholasnet
  • 2,117
  • 2
  • 24
  • 46
  • This is what I did max_by_group = ( data.map(lambda x: (x[0], x))   .reduceByKey(lambda x1, x2: max(x1, x2, key=lambda x: x[-1]))    .values() ) – nicholasnet Aug 23 '18 at 18:53

3 Answers3

4

groupBy the first element, and then find the max value for each group by the third element in the tuple:

(rdd.groupBy(lambda x: x[0])
    .mapValues(lambda x: max(x, key=lambda y: y[2]))
    .values()
).collect()
# [('24', '14', 37), ('23', '89', 39)]

Or use reduceByKey:

(rdd.map(lambda x: (x[0], x))
    .reduceByKey(lambda x, y: x if x[2] > y[2] else y)
    .values()
).collect()
# [('24', '14', 37), ('23', '89', 39)]
Psidom
  • 209,562
  • 33
  • 339
  • 356
  • Thank you very much. I tried both methods. It took a while since there were nearly billion records. But to my surprise reduceByKey was faster than groupBy not sure why. Also your solution of reduceByKey was almost same as mine :) – nicholasnet Aug 23 '18 at 18:49
  • Yes `reduceByKey` is more efficient than `groupBy` when it's applicable, because `reduceByKey` is optimized and it combines data for each partition before shuffling; The shuffling after the by partition aggregation minimizes data transfer across the cluster, which is generally expensive. – Psidom Aug 23 '18 at 18:57
  • 1
    Hmm I did not know that. Thank you very much for sharing this info. – nicholasnet Aug 23 '18 at 18:59
2

@Psidom's answer is what you're looking for if you're using rdds. Another option is to convert your rdd to a DataFrame.

rdd = sc.parallelize(data)
df = rdd.toDF(["x1", "x2", "value"])
df.show()
#+---+---+-----+
#| x1| x2|value|
#+---+---+-----+
#| 23| 98|   34|
#| 23| 89|   39|
#| 23| 12|   30|
#| 24| 12|   34|
#| 24| 14|   37|
#| 24| 16|   30|
#+---+---+-----+

Now you can group by x1 and filter the rows with the maximum value:

import pyspark.sql.functions as f
from pyspark.sql import Window
w = Window.partitionBy('x1')
df.withColumn('maxValue', f.max('value').over(w))\
    .where(f.col('value') == f.col('maxValue'))\
    .drop('maxValue')\
    .show()
#+---+---+-----+
#| x1| x2|value|
#+---+---+-----+
#| 23| 89|   39|
#| 24| 14|   37|
#+---+---+-----+
pault
  • 41,343
  • 15
  • 107
  • 149
0

from itertools import groupby:

[max(list(j),key=lambda x:x[2]) for i,j in groupby(data,key = lambda x:x[0])]

Out[335]: [('23', '89', 39), ('24', '14', 37)]
Onyambu
  • 67,392
  • 3
  • 24
  • 53