2

I have an rdd like the following:

[{'age': 2.18430371791803,
  'code': u'"315.320000"',
  'id': u'"00008RINR"'},
 {'age': 2.80033330216659,
  'code': u'"315.320000"',
  'id': u'"00008RINR"'},
 {'age': 2.8222365762732,
  'code': u'"315.320000"',
  'id': u'"00008RINR"'},
 {...}]

I am trying to reduce each id to just 1 record by taking the highest frequency code using code like:

rdd.map(lambda x: (x["id"], [(x["age"], x["code"])]))\
.reduceByKey(lambda x, y: x + y)\
.map(lambda x: [i[1] for i in x[1]])\
.map(lambda x: [max(zip((x.count(i) for i in set(x)), set(x)))])

There is one problem with this implementation, it doesn't consider age, so if for example one id had multiple codes with a frequency of 2, it would take the last code.

To illustrate this issue, please consider this reduced id:

(u'"000PZ7S2G"',
 [(4.3218651186303, u'"388.400000"'),
  (4.34924421126357, u'"388.400000"'),
  (4.3218651186303, u'"389.900000"'),
  (4.34924421126357, u'"389.900000"'),
  (13.3667102491139, u'"794.310000"'),
  (5.99897016368982, u'"995.300000"'),
  (6.02634923989903, u'"995.300000"'),
  (4.3218651186303, u'"V72.19"'),
  (4.34924421126357, u'"V72.19"'),
  (13.3639723398581, u'"V81.2"'),
  (13.3667102491139, u'"V81.2"')])

my code would output:

[(2, u'"V81.2"')]

when I would like for it to output:

[(2, u'"388.400000"')]

because although the frequency is the same for both of these codes, code 388.400000 has a lesser age and appears first.

by adding this line after the .reduceByKey():

.map(lambda x: (x[0], [i for i in x[1] if i[0] == min(x[1])[0]]))

I'm able to filter out those with greater than min age, but then I'm only considering those with min age and not all codes to calculate their frequency. I can't apply the same/ similar logic after [max(zip((x.count(i) for i in set(x)), set(x)))] as the set(x) is the set of x[1], which doesn't consider the age.

I should add, I don't want to just take the first code with the highest frequency, I'd like to take the highest frequency code with the least age, or the code that appears first, if this is possible, using only rdd actions.

equivalent code in SQL of what I'm trying to get would be something like:

SELECT code, count(*) as code_frequency
FROM (SELECT id, code, age
FROM (SELECT id, code, MIN(age) AS age, COUNT(*) as cnt,
             ROW_NUMBER() OVER (PARTITION BY id ORDER BY COUNT(*) DESC, MIN(age)) as seqnum
      FROM tbl
      GROUP BY id, code
     ) t
WHERE seqnum = 1) a
GROUP BY code
ORDER by code_frequency DESC
LIMIT 5;

and as a DF (though trying to avoid this):

wc = Window().partitionBy("id", "code").orderBy("age")
wc2 = Window().partitionBy("id")
df = rdd.toDF()
df = df.withColumn("count", F.count("code").over(wc))\
.withColumn("max", F.max("count").over(wc2))\
.filter("count = max")\
.groupBy("id").agg(F.first("age").alias("age"),
                           F.first("code").alias("code"))\
.orderBy("id")\
.groupBy("code")\
.count()\
.orderBy("count", ascending = False)

I'd really appreciate any help with this.

mad-a
  • 153
  • 3
  • 11
  • That looks like a duplicate of [How to select the first row of each group?](https://stackoverflow.com/questions/33878370/how-to-select-the-first-row-of-each-group) – user10938362 Mar 28 '20 at 16:04
  • It's a similar problem with very different solutions, none of which are in python using rdd operations. – mad-a Mar 28 '20 at 17:21
  • @mad-a, your question is somewhat confusing. in the equivalent SQL code, the MIN(age) is calculated but never used in any logic. the final aggregation sounds to merge the calculation between `id`s. it would be helpful if you can supply sample data with at least one different `id` and the expected result. – jxc Apr 10 '20 at 19:06
  • @jxc Hi, sorry about that, I forgot to add min(age) after count(*) desc in the order by statement within the id partition. In essence; if there are 2 codes with the same count, I want seqnum = 1 to be assigned to the row with the least min age. – mad-a Apr 11 '20 at 15:25

2 Answers2

1

Based on the SQL equivalent of your code, I converted the logic into the following rdd1 plus some post-processing (starting from the original RDD):

rdd = sc.parallelize([{'age': 4.3218651186303, 'code': '"388.400000"', 'id': '"000PZ7S2G"'},
 {'age': 4.34924421126357, 'code': '"388.400000"', 'id': '"000PZ7S2G"'},
 {'age': 4.3218651186303, 'code': '"389.900000"', 'id': '"000PZ7S2G"'},
 {'age': 4.34924421126357, 'code': '"389.900000"', 'id': '"000PZ7S2G"'},
 {'age': 13.3667102491139, 'code': '"794.310000"', 'id': '"000PZ7S2G"'},
 {'age': 5.99897016368982, 'code': '"995.300000"', 'id': '"000PZ7S2G"'},
 {'age': 6.02634923989903, 'code': '"995.300000"', 'id': '"000PZ7S2G"'},
 {'age': 4.3218651186303, 'code': '"V72.19"', 'id': '"000PZ7S2G"'},
 {'age': 4.34924421126357, 'code': '"V72.19"', 'id': '"000PZ7S2G"'},
 {'age': 13.3639723398581, 'code': '"V81.2"', 'id': '"000PZ7S2G"'},
 {'age': 13.3667102491139, 'code': '"V81.2"', 'id': '"000PZ7S2G"'}])

rdd1 = rdd.map(lambda x: ((x['id'], x['code']),(x['age'], 1))) \
    .reduceByKey(lambda x,y: (min(x[0],y[0]), x[1]+y[1])) \
    .map(lambda x: (x[0][0], (-x[1][1] ,x[1][0], x[0][1]))) \
    .reduceByKey(lambda x,y: x if x < y else y) 
# [('"000PZ7S2G"', (-2, 4.3218651186303, '"388.400000"'))]

Where:

  1. use map to initialize the pair-RDD with key=(x['id'], x['code']), value=(x['age'], 1)
  2. use reduceByKey to calculate min_age and count
  3. use map to reset the pair-RDD with key=id and value=(-count, min_age, code)
  4. use reduceByKey to find the min value of tuples (-count, min_age, code) for the same id

The above steps are similar to:

  • Step (1) + (2): groupby('id', 'code').agg(min('age'), count())
  • Step (3) + (4): groupby('id').agg(min(struct(negative('count'),'min_age','code')))

You can then get the derived table a in your SQL by doing rdd1.map(lambda x: (x[0], x[1][2], x[1][1])), but this step is not necessary. the code can be counted directly from the above rdd1 by another map function + countByKey() method and then sort the result:

sorted(rdd1.map(lambda x: (x[1][2],1)).countByKey().items(), key=lambda y: -y[1])
# [('"388.400000"', 1)]

However, if what you are looking for is the sum(count) across all ids, then do the following:

rdd1.map(lambda x: (x[1][2],-x[1][0])).reduceByKey(lambda x,y: x+y).collect()
# [('"388.400000"', 2)]
jxc
  • 13,553
  • 4
  • 16
  • 34
  • thanks so much, you're an absolute hero! Thank you for breaking it down and explaining it so succinctly as well. :) – mad-a Apr 14 '20 at 14:46
0

If converting the rdd to a dataframe is an option, I think this approach may solve your problem:

from pyspark.sql.functions import row_number, col
from pyspark.sql import Window
df = rdd.toDF()
w = Window.partitionBy('id').orderBy('age')
df = df.withColumn('row_number', row_number.over(w)).where(col('row_number') == 1).drop('row_number')
danielcahall
  • 2,672
  • 8
  • 14
  • I'm trying to stick to using only rdd actions. But I'm not sure your code would actually work either, I'll edit the op to include a df version. – mad-a Mar 28 '20 at 21:39