0

I have a list of map e.g

[{'a' : 10,'b': 20}, {'a' : 5,'b': 20} , {'b': 20}  ,{'a' : 0,'b': 20} } 

I want to get the average of values of a and b. So the expected output is

a = (10 + 5 + 0 + 0) /3 = 5 ;
b = 80/4 = 20.

How can i do it efficiently using RDD

Vinny
  • 865
  • 1
  • 11
  • 25

3 Answers3

1

The easiest might be map your rdd element to a format like:

init = {'a': {'sum': 0, 'cnt': 0}, 'b': {'sum': 0, 'cnt': 0}}

i.e. record the sum and count for each key, and then reduce it.

Map function:

def map_fun(d, keys=['a', 'b']):
    map_d = {}
    for k in keys:
        if k in d:
            temp = {'sum': d[k], 'cnt': 1}
        else:
            temp = {'sum': 0, 'cnt': 0}
        map_d[k] = temp
    return map_d

Reduce function:

def reduce_fun(a, b, keys=['a', 'b']):
    from collections import defaultdict
    reduce_d = defaultdict(dict)
    for k in keys:
        reduce_d[k]['sum'] = a[k]['sum'] + b[k]['sum']
        reduce_d[k]['cnt'] = a[k]['cnt'] + b[k]['cnt']
    return reduce_d

rdd.map(map_fun).reduce(reduce_fun)
# defaultdict(<type 'dict'>, {'a': {'sum': 15, 'cnt': 3}, 'b': {'sum': 80, 'cnt': 4}})

Calculate the average:

d = rdd.map(map_fun).reduce(reduce_fun)
{k: v['sum']/v['cnt'] for k, v in d.items()}
{'a': 5, 'b': 20}
Psidom
  • 209,562
  • 33
  • 339
  • 356
1

Given the structure of your data you should be able to use the dataframe api to achieve this calculation. If you need an rdd it is not to hard to get from the dataframe back to an rdd.

from pyspark.sql import functions as F
df = spark.createDataFrame([{'a' : 10,'b': 20}, {'a' : 5,'b': 20} , {'b': 20}  ,{'a' : 0,'b': 20}])

Dataframe looks like this

+----+---+
|   a|  b|
+----+---+
|  10| 20|
|   5| 20|
|null| 20|
|   0| 20|
+----+---+

Then it follows simply to calculate averages using the pyspark.sql functions

cols = df.columns
df_means = df.agg(*[F.mean(F.col(col)).alias(col+"_mean") for col in cols])
df_means.show()

OUTPUT:

+------+------+
|a_mean|b_mean|
+------+------+
|   5.0|  20.0|
+------+------+
johnmdonich
  • 339
  • 1
  • 6
0

You can use defaultdict to collect similar keys and their values as list. Then simply aggregate using sum of values divided by number of elements of list for each value.

from collections import defaultdict

x = [{'a' : 10,'b': 20}, {'a' : 5,'b': 20} , {'b': 20}  ,{'a' : 0,'b': 20}]
y = defaultdict(lambda: [])
[y[k].append(v) for i in x for k,v in i.items() ]

for k,v in y.items():
    print k, "=" ,sum(v)/len(v)

>>> y
defaultdict(<function <lambda> at 0x02A43BB0>, {'a': [10, 5, 0], 'b': [20, 20, 20, 20]})
>>> 

>>> 
a = 5
b = 20
Anil_M
  • 10,893
  • 6
  • 47
  • 74