7

When reduceByKey is called it sums all values with same key. Is there any way to calculate the average of values for each key ?

// I calculate the sum like this and don't know how to calculate the avg
reduceByKey((x,y)=>(x+y)).collect


Array(((Type1,1),4.0), ((Type1,1),9.2), ((Type1,2),8), ((Type1,2),4.5), ((Type1,3),3.5), 
((Type1,3),5.0), ((Type2,1),4.6), ((Type2,1),4), ((Type2,1),10), ((Type2,1),4.3))
finman
  • 73
  • 1
  • 1
  • 4
  • I guess that you meant reduceByKey (as in the title). Please show an exemple of the code you tried. ReduceByKey does not have a default aggregation, so if it does the sum, you asked it. – Wilmerton Oct 17 '16 at 13:23
  • 2
    Possible duplicate of [Spark RDD: How to calculate statistics most efficiently?](http://stackoverflow.com/questions/39981312/spark-rdd-how-to-calculate-statistics-most-efficiently) – mtoto Oct 17 '16 at 13:26
  • 2
    you need both sum and count separaterly, see e.g. http://stackoverflow.com/questions/29930110/calculating-the-averages-for-each-key-in-a-pairwise-k-v-rdd-in-spark-with-pyth – Raphael Roth Oct 17 '16 at 13:30
  • Documentation in scala for aggregateByKey: http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions – Wilmerton Oct 17 '16 at 13:30
  • I asked and answered a similar question a while back, and it (the following URL) may help you and others: https://stackoverflow.com/questions/29930110/calculating-the-averages-for-each-key-in-a-pairwise-k-v-rdd-in-spark-with-pyth – NYCeyes Jan 12 '18 at 01:02

2 Answers2

21

One way is to use mapValues and reduceByKey which is easier than aggregateByKey.

.mapValues(value => (value, 1)) // map entry with a count of 1
.reduceByKey {
  case ((sumL, countL), (sumR, countR)) => 
    (sumL + sumR, countL + countR)
}
.mapValues { 
  case (sum , count) => sum / count 
}
.collect

https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html

gabi
  • 1,324
  • 4
  • 22
  • 47
1

there's lots of ways... but a simple way is to just use a class that keeps track of your total and count and computes average at the end. something like this would work.

class AvgCollector(val tot: Double, val cnt: Int = 1) {
  def combine(that: AvgCollector) = new AvgCollector(tot + that.tot, cnt + that.cnt)
  def avg = tot / cnt 
}

val rdd2 = {
  rdd
  .map{ case (k,v) => (k, new AvgCollector(v)) }
  .reduceByKey(_ combine _)
  .map{ case (k,v) => (k, v.avg) }
}

... or you could use aggregateByKey with a tweak to the class

class AvgCollector(val tot: Double, val cnt: Int = 1) {
  def ++(v: Double) = new AvgCollector(tot + v, cnt + 1)
  def combine(that: AvgCollector) = new AvgCollector(tot + that.tot, cnt + that.cnt)
  def avg = if (cnt > 0) tot / cnt else 0.0
}

rdd2 = {
  rdd
  .aggregateByKey( new AvgCollector(0.0,0) )(_ ++ _, _ combine _ )
  .map{ case (k,v) => (k, v.avg) }
}
kmh
  • 1,516
  • 17
  • 33