1

I have an RDD which has 2 partition and key value pair data as value:

rdd5.glom().collect()

[[(u'hive', 1), (u'python', 1), (u'spark', 1), (u'hive', 1), (u'spark', 1), (u'python', 1)], [(u'spark', 1), (u'java', 1), (u'java', 1), (u'spark', 1)]]

When I perform aggregateByKey

rdd6=rdd5.aggregateByKey((0,0), lambda acc,val: (acc[0]+1,acc[1]+val), lambda acc1,acc2 : (acc1[1]+acc2[1])/acc1[0]+acc2[0])

It is not giving me expected result:

Output:

[(u'python', (2, 2)), (u'spark', 1), (u'java', (2, 2)), (u'hive', (2, 2))]

Expected:

[(u'python', 1), (u'spark', 1), (u'java', 1), (u'hive', 1)]

I can see key present in one partition only not giving me expected output. What changes should I make to achieve that?

mayank agrawal
  • 2,495
  • 2
  • 13
  • 32
pandi
  • 15
  • 1
  • 7
  • What are you trying to do? Why are expecting to get [(u'python', 1), (u'spark', 1), (u'java', 1), (u'hive', 1)]? – user3689574 May 20 '18 at 17:38
  • I am new to Spark and learning all transformation. I actually want average of each skill present in the input. Trying to understand AggregateByKey transformation and its usage – pandi May 20 '18 at 17:45
  • The average of what? The value? – user3689574 May 20 '18 at 17:57

1 Answers1

2

Ok so below is the way to do this using reduceByKey and aggregateByKey.

The problem you had with aggregateByKey is that the last function is responsiable for adding two accumulators. It has to return the same structure as all other functions so that when adding another new accumulator (From another partition) it will work again.

It is very similar to combineByKey, see here.

rdd = sc.parallelize([(u'hive', 1), (u'python', 1), (u'spark', 1),\
    (u'hive', 1), (u'spark', 1), (u'python', 1), (u'spark', 1), (u'java', 1), (u'java', 1), (u'spark', 1)])

print rdd.aggregateByKey( (0, 0), lambda acc, val: (acc[0] + 1,acc[1] + val),\
                         lambda acc1, acc2 : (acc1[0] + acc2[0], acc1[1] + acc2[1])).collect()

print rdd.mapValues(lambda x: (1, x)).reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1])).collect()

[(u'spark', (4, 4)), (u'java', (2, 2)), (u'hive', (2, 2)), (u'python', (2, 2))]

[(u'spark', (4, 4)), (u'java', (2, 2)), (u'hive', (2, 2)), (u'python', (2, 2))]

If you are trying to average the values, you can add another mapValues at the end like so:

print rdd.aggregateByKey( (0, 0),\
                         lambda acc, val: (acc[0] + 1,acc[1] + val),\
                         lambda acc1, acc2 : (acc1[0] + acc2[0], acc1[1] + acc2[1]))\
                        .mapValues(lambda x: x[1] * 1.0 / x[0])\
                        .collect()

[(u'spark', 1.0), (u'java', 1.0), (u'hive', 1.0), (u'python', 1.0)]

user3689574
  • 1,596
  • 1
  • 11
  • 20