Given the following code, I'm trying to calculate average of the floating point column on a per month basis.
rdd = sc.parallelize(
[['JAN', 'NY', 3.0],
['JAN', 'PA', 1.0],
['JAN', 'NJ', 2.0],
['JAN', 'CT', 4.0],
['FEB', 'PA', 1.0],
['FEB', 'NJ', 1.0],
['FEB', 'NY', 2.0],
['FEB', 'VT', 1.0],
['MAR', 'NJ', 2.0],
['MAR', 'NY', 1.0],
['MAR', 'VT', 2.0],
['MAR', 'PA', 3.0]])
def avg_map(row):
return (row[0], (row[2], 1))
def avg_reduce_func(value1, value2):
return (value1[0], (value1[1][0] + value2[1][0], value1[1][1] + value2[1][1]))
dataset_rdd.map(avg_map_func).reduceByKey(avg_reduce_func).collect()
From a high level point of view, I was trying to first use map to create an RDD of the following form:
[('JAN', (3.0, 1)),
('JAN', (1.0, 1)),
('JAN', (2.0, 1)),
('JAN', (4.0, 1)),
('FEB', (1.0, 1)),
('FEB', (1.0, 1)),
('FEB', (2.0, 1)),
('FEB', (1.0, 1)),
('MAR', (2.0, 1)),
('MAR', (1.0, 1)),
('MAR', (2.0, 1)),
('MAR', (3.0, 1))]
Then, I wanted to use the reduceByKey function to add up the ones and the floats by key creating a new RDD which contains one row per month with a tuple representing the total of the floats and an integer indicating the number of rows. For example, the Jan row would look like this:
('Jan', (10.0, 4))
However, I don't seem to be able to index into the tuple correctly and end up with a runtime error in the reduceByKey function.
Question 1: Why can't I index into the tuple in the avg_reduce_func? Question 2: How can this code be rewritten to calculate the average of the floating point column on a per month basis?