3

I am beginner to learning Apache Spark. Currently I am trying to learn various aggregations using Python.

To give to some context to the problem I am facing, I am finding it difficult to understand the working of aggregateByKey function to calculate number of orders by "status".

I am following a YouTube playlist by ITVersity and below is the code and some sample output I am working with.

ordersRDD = sc.textFile("/user/cloudera/sqoop_import/orders")
for i in ordersRDD.take(10): print(i)

Output:
1,2013-07-25 00:00:00.0,11599,CLOSED
2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT
3,2013-07-25 00:00:00.0,12111,COMPLETE
4,2013-07-25 00:00:00.0,8827,CLOSED
5,2013-07-25 00:00:00.0,11318,COMPLETE
6,2013-07-25 00:00:00.0,7130,COMPLETE
7,2013-07-25 00:00:00.0,4530,COMPLETE
8,2013-07-25 00:00:00.0,2911,PROCESSING
9,2013-07-25 00:00:00.0,5657,PENDING_PAYMENT
10,2013-07-25 00:00:00.0,5648,PENDING_PAYMENT

ordersMap = ordersRDD.map(lambda x: (x.split(",")[3], x))

Output:
(u'CLOSED', u'1,2013-07-25 00:00:00.0,11599,CLOSED')
(u'PENDING_PAYMENT', u'2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT')
(u'COMPLETE', u'3,2013-07-25 00:00:00.0,12111,COMPLETE')
(u'CLOSED', u'4,2013-07-25 00:00:00.0,8827,CLOSED')
(u'COMPLETE', u'5,2013-07-25 00:00:00.0,11318,COMPLETE')
(u'COMPLETE', u'6,2013-07-25 00:00:00.0,7130,COMPLETE')
(u'COMPLETE', u'7,2013-07-25 00:00:00.0,4530,COMPLETE')
(u'PROCESSING', u'8,2013-07-25 00:00:00.0,2911,PROCESSING')
(u'PENDING_PAYMENT', u'9,2013-07-25 00:00:00.0,5657,PENDING_PAYMENT')
(u'PENDING_PAYMENT', u'10,2013-07-25 00:00:00.0,5648,PENDING_PAYMENT')

ordersByStatus = ordersMap.aggregateByKey(0, lambda acc, val: acc + 1, lambda acc,val: acc + val)
for i in ordersByStatus.take(10): print(i)

Final Output:
(u'SUSPECTED_FRAUD', 1558)
(u'CANCELED', 1428)
(u'COMPLETE', 22899)
(u'PENDING_PAYMENT', 15030)
(u'PENDING', 7610)
(u'CLOSED', 7556)
(u'ON_HOLD', 3798)
(u'PROCESSING', 8275)
(u'PAYMENT_REVIEW', 729)

The questions I have difficulty understanding are:
1. Why does the aggregateByKey function taken in 2 lambda functions as parameters?
2. Vizualize what the first lambda function does?
3. Vizualize what the second lambda function does?

It would be very helpful if you could explain me the above questions and also workings of aggregateByKey with some simple block diagram if possible? Perhaps a few intermediate calculation?

Appreciate your help!

Thanks,
Shiv

Shiv Konar
  • 43
  • 5

1 Answers1

5

Spark RDDs are divided into partitions, so when you perform an aggregate function over all of your data, you will first aggregate the data inside each partition (a partition is just a subdivision of the data). Then, you will need to tell Spark how to aggregate the partitions.

The first lambda function tells Spark how to change the running count (accumulator) when it encounters a new value. Since you're counting, you just add 1 to the accumulator. Within one slice, if the running count is currently 4 and another value is added, then the running count should be 4 + 1 = 5. So, your first lambda function is:

lambda acc, val: acc + 1

The second lambda function tells Spark how to combine the running count from one slice of your data with the running count from another slice of your data. If one slice had a count of 5 and a second slice had a count of 7, then the combined count is 5 + 7 = 12. So your second function would be better written like:

lambda acc1, acc2: acc1 + acc2

The only subtlety left is that everything is done on a "by key" basis. The accumulators (counts) are different depending on the key.

  • Thanks for your answer! This makes perfect sense now. I did upvote your answer but my reputation is not high enough for it to be shown publicly. Appreciate your help! – Shiv Konar Feb 02 '16 at 21:58