1

Suppose I have the following RDD in pyspark, where each row is a list:

[foo, apple]
[foo, orange]
[foo, apple]
[foo, apple]
[foo, grape]
[foo, grape]
[foo, plum]
[bar, orange]
[bar, orange]
[bar, orange]
[bar, grape]
[bar, apple]
[bar, apple]
[bar, plum]
[scrog, apple]
[scrog, apple]
[scrog, orange]
[scrog, orange]
[scrog, grape]
[scrog, plum]

I would like to show the top 3 fruit (index 1) for each group (index 0), ordered by the count of fruit. Suppose for the sake of simplicity, not caring much about ties (e.g. scrog has count 1 for grape and plum; don't care which).

My goal is output like:

foo, apple, 3
foo, grape, 2
foo, orange, 1
bar, orange, 3
bar, apple, 2
bar, plum, 1   # <------- NOTE: could also be "grape" of count 1
scrog, orange, 2  # <---------- NOTE: "scrog" has many ties, which is okay
scrog, apple, 2
scrog, grape, 1

I can think of a likely inefficient approach:

  • get unique groups and .collect() as list
  • filter full rdd by group, count and sort fruits
  • use something like zipWithIndex() to get top 3 counts
  • save as new RDD with format (<group>, <fruit>, <count>)
  • union all RDDs at end

But I'm interested in not only more spark specific approaches, but ones that might skip expensive actions like collect() and zipWithIndex().

As a bonus -- but not required -- if I did want to apply sorting/filtering to address ties, where that might best be accomplished.

Any advice much appreciated!

UPDATE: in my context, unable to use dataframes; must use RDDs only.

ghukill
  • 1,136
  • 17
  • 42

2 Answers2

1

map and reduceByKey operations in pyspark

Sum the counts with .reduceByKey, group the groups with .groupByKey, select the top 3 of each group with .map and heapq.nlargest.

rdd = sc.parallelize([
    ["foo", "apple"], ["foo", "orange"], ["foo", "apple"], ["foo", "apple"],
    ["foo", "grape"], ["foo", "grape"], ["foo", "plum"], ["bar", "orange"],
    ["bar", "orange"], ["bar", "orange"], ["bar", "grape"], ["bar", "apple"],
    ["bar", "apple"], ["bar", "plum"], ["scrog", "apple"], ["scrog", "apple"],
    ["scrog", "orange"], ["scrog", "orange"], ["scrog", "grape"], ["scrog", "plum"]
])

from operator import add
from heapq import nlargest

n = 3

results = rdd.map(lambda x: ((x[0], x[1]), 1)).reduceByKey(add) \
             .map(lambda x: (x[0][0], (x[1], x[0][1]))).groupByKey() \
             .map(lambda x: (x[0], nlargest(n, x[1])))

print( results.collect() )
# [('bar', [(3, 'orange'), (2, 'apple'), (1, 'plum')]),
#  ('scrog', [(2, 'orange'), (2, 'apple'), (1, 'plum')]),
#  ('foo', [(3, 'apple'), (2, 'grape'), (1, 'plum')])]

Standard python

For comparison, if you have a simple python list instead of an rdd, the easiest way to do grouping in python is with dictionaries:

data = [
    ["foo", "apple"], ["foo", "orange"], ["foo", "apple"], ["foo", "apple"],
    ["foo", "grape"], ["foo", "grape"], ["foo", "plum"], ["bar", "orange"],
    ["bar", "orange"], ["bar", "orange"], ["bar", "grape"], ["bar", "apple"],
    ["bar", "apple"], ["bar", "plum"], ["scrog", "apple"], ["scrog", "apple"],
    ["scrog", "orange"], ["scrog", "orange"], ["scrog", "grape"], ["scrog", "plum"]
]

from heapq import nlargest
from operator import itemgetter

d = {}
for k,v in data:
    d.setdefault(k, {})
    d[k][v] = d[k].get(v, 0) + 1
print(d)
# {'foo': {'apple': 3, 'orange': 1, 'grape': 2, 'plum': 1}, 'bar': {'orange': 3, 'grape': 1, 'apple': 2, 'plum': 1}, 'scrog': {'apple': 2, 'orange': 2, 'grape': 1, 'plum': 1}}

n = 3
results = [(k,v,c) for k,subd in d.items()
                   for v,c in nlargest(n, subd.items(), key=itemgetter(1))]
print(results)
# [('foo', 'apple', 3), ('foo', 'grape', 2), ('foo', 'orange', 1), ('bar', 'orange', 3), ('bar', 'apple', 2), ('bar', 'grape', 1), ('scrog', 'apple', 2), ('scrog', 'orange', 2), ('scrog', 'grape', 1)]
Stef
  • 13,242
  • 2
  • 17
  • 28
  • Just for my own learning/understanding, seeing how `nlargest()` could be replaced by `.map(lambda x: (x[0], sorted(x[1], key=lambda x: x[0], reverse=True)[:n]` (as the source states `Equivalent to: sorted(iterable, key=key, reverse=True)[:n]`) but really like the `nlargest()` syntax. Thanks for sharing that. – ghukill Mar 08 '22 at 13:38
  • 1
    @ghukill Note that `nlargest(n, iterable)` is only equivalent to `sorted(iterable, reverse=True)[:n]` in the sense that it produces the same result. However, if there are K elements in the iterable and n is much smaller than K, then `nlargest` is more efficient. Using `sorted` takes time Θ(K log K) and space Θ(K), whereas using `nlargest` takes time Θ(K log n) and space Θ(n). – Stef Mar 08 '22 at 14:28
  • In your example, if the groups are rather small, then it doesn't really matter. I prefer using `nlargest` because I think it makes the code easier to read, since `nlargest` has a very explicit name. – Stef Mar 08 '22 at 14:30
  • @ghukill Also note that I reversed the order of the tuples: in the output, we get `(3, 'orange')` instead of `('orange', 3)`. Instead, you could keep the tuples in their original order, and use the `key` optional argument of `nlargest` to specify which field of the tuple is relevant. – Stef Mar 08 '22 at 14:35
  • @ghukill And lastly, when manipulating fields from tuples with `.map`, you might find `from operator import itemgetter` convenient to avoid using `lambda`. For instance, `rdd.map(lambda x: (x[2], x[1], x[0]))` is equivalent to `rdd.map(itemgetter(2,1,0))`. However, sadly `itemgetter` cannot deal with nested tuples, and I had to nest the tuples in order for `reduceByKey` and `groupByKey` to work as intended. – Stef Mar 08 '22 at 14:38
  • all helpful and interesting, appreciate it. The ordering of tuples, yep, all that made sense. But -- embarssingly -- hadn't considered the code behind `nlarger()` might be different than `sorted(...)`; read that bit in source I mentioned above and just assumed it might be like an alias. But makes sense if coming from the `heapq` module. – ghukill Mar 08 '22 at 19:57
  • 1
    @ghukill The documentation for standard python module is full of "this is equivalent to..." and they always mean "equivalent" in the sense that it produces the same result, but usually using the named function is more efficient than the "equivalent" code. The actual implementation of `nlargest` in CPython is here: https://github.com/python/cpython/blob/3.10/Lib/heapq.py#L521 ; the main algorithm is on lines 545-559. On lines 540-541 you can see that *in some cases*, `nlargest` actually calls `sorted` internally; but that's only when `n` is equal to the length of the iterable. – Stef Mar 09 '22 at 09:38
  • @ghukill See [this related question](https://stackoverflow.com/a/23038826/3080723) for a very brief explanation of the nlargest algorithm. – Stef Mar 09 '22 at 09:40
  • _awesome_, thanks! Would be really interesting to somehow model these kind of dependencies in the standard lib; e.g. relationships like `same_func_sig_input` and/or `same_func_sig_output`. Kind of rabbit-holey there and unrelated, but these kind of linkages kindle those thoughts. Appreciate all the backstory here. – ghukill Mar 09 '22 at 14:02
0
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, col
spark = (SparkSession.builder.appName("foo").getOrCreate())

initial_list = [["foo", "apple"], ["foo", "orange"],
            ["foo", "apple"], ["foo", "apple"],
            ["foo", "grape"], ["foo", "grape"],
            ["foo", "plum"], ["bar", "orange"],
            ["bar", "orange"], ["bar", "orange"],
            ["bar", "grape"], ["bar", "apple"],
            ["bar", "apple"], ["bar", "plum"],
            ["scrog", "apple"], ["scrog", "apple"],
            ["scrog", "orange"], ["scrog", "orange"],
            ["scrog", "grape"], ["scrog", "plum"]]
# creating rdd
rdd = spark.sparkContext.parallelize(initial_list)
# converting rdd to dataframe
df = rdd.toDF()

# group by index 0 and index 1 to get count of each
df2 = df.groupby(df._1, df._2).count()

window = Window.partitionBy(df2['_1']).orderBy(df2['count'].desc())
# picking only first 3 from decreasing order of count
df3 = df2.select('*',         rank().over(window).alias('rank')).filter(col('rank') <= 3)
# display the reqruired dataframe
df3.select('_1', '_2', 'count').orderBy('_1', col('count').desc()).show()