I have an rdd like the following:
[{'age': 2.18430371791803,
'code': u'"315.320000"',
'id': u'"00008RINR"'},
{'age': 2.80033330216659,
'code': u'"315.320000"',
'id': u'"00008RINR"'},
{'age': 2.8222365762732,
'code': u'"315.320000"',
'id': u'"00008RINR"'},
{...}]
I am trying to reduce each id to just 1 record by taking the highest frequency code using code like:
rdd.map(lambda x: (x["id"], [(x["age"], x["code"])]))\
.reduceByKey(lambda x, y: x + y)\
.map(lambda x: [i[1] for i in x[1]])\
.map(lambda x: [max(zip((x.count(i) for i in set(x)), set(x)))])
There is one problem with this implementation, it doesn't consider age, so if for example one id had multiple codes with a frequency of 2, it would take the last code.
To illustrate this issue, please consider this reduced id:
(u'"000PZ7S2G"',
[(4.3218651186303, u'"388.400000"'),
(4.34924421126357, u'"388.400000"'),
(4.3218651186303, u'"389.900000"'),
(4.34924421126357, u'"389.900000"'),
(13.3667102491139, u'"794.310000"'),
(5.99897016368982, u'"995.300000"'),
(6.02634923989903, u'"995.300000"'),
(4.3218651186303, u'"V72.19"'),
(4.34924421126357, u'"V72.19"'),
(13.3639723398581, u'"V81.2"'),
(13.3667102491139, u'"V81.2"')])
my code would output:
[(2, u'"V81.2"')]
when I would like for it to output:
[(2, u'"388.400000"')]
because although the frequency is the same for both of these codes, code 388.400000 has a lesser age and appears first.
by adding this line after the .reduceByKey():
.map(lambda x: (x[0], [i for i in x[1] if i[0] == min(x[1])[0]]))
I'm able to filter out those with greater than min age, but then I'm only considering those with min age and not all codes to calculate their frequency. I can't apply the same/ similar logic after [max(zip((x.count(i) for i in set(x)), set(x)))] as the set(x) is the set of x[1], which doesn't consider the age.
I should add, I don't want to just take the first code with the highest frequency, I'd like to take the highest frequency code with the least age, or the code that appears first, if this is possible, using only rdd actions.
equivalent code in SQL of what I'm trying to get would be something like:
SELECT code, count(*) as code_frequency
FROM (SELECT id, code, age
FROM (SELECT id, code, MIN(age) AS age, COUNT(*) as cnt,
ROW_NUMBER() OVER (PARTITION BY id ORDER BY COUNT(*) DESC, MIN(age)) as seqnum
FROM tbl
GROUP BY id, code
) t
WHERE seqnum = 1) a
GROUP BY code
ORDER by code_frequency DESC
LIMIT 5;
and as a DF (though trying to avoid this):
wc = Window().partitionBy("id", "code").orderBy("age")
wc2 = Window().partitionBy("id")
df = rdd.toDF()
df = df.withColumn("count", F.count("code").over(wc))\
.withColumn("max", F.max("count").over(wc2))\
.filter("count = max")\
.groupBy("id").agg(F.first("age").alias("age"),
F.first("code").alias("code"))\
.orderBy("id")\
.groupBy("code")\
.count()\
.orderBy("count", ascending = False)
I'd really appreciate any help with this.