5

I'm very new to both Scala and Spark, so please forgive me if I'm going about this completley wrong. After taking in a csv file, filtering, and mapping; I have an RDD that is a bunch of (String, Double) pairs.

(b2aff711,-0.00510)
(ae095138,0.20321)
(etc.)

When I use .groupByKey( ) on the RDD,

val grouped = rdd1.groupByKey()

to get a RDD with a bunch of (String, [Double]) pairs. (I don't know what CompactBuffer means, maybe could be causing my issue?)

(32540b03,CompactBuffer(-0.00699, 0.256023))
(a93dec11,CompactBuffer(0.00624))
(32cc6532,CompactBuffer(0.02337, -0.05223, -0.03591))
(etc.)

Once they are grouped I am trying to take the mean and the standard deviation. I want to simply use the .mean( ) and .sampleStdev( ). When I try to create a new RDD of the means,

val mean = grouped.mean()

an error is returned

Error:(51, 22) value mean is not a member of org.apache.spark.rdd.RDD[(String, Iterable[Double])]

val mean = grouped.mean( )

I have imported org.apache.spark.SparkContext._
I also tried using the sampleStdev( ), .sum( ), .stats( ) with the same results. Whatever the problem, it appears to be affecting all of the numeric RDD operations.

the3rdNotch
  • 637
  • 2
  • 8
  • 18
  • 1
    I asked and answered a similar question a while back, and it (the following URL) may help you and others: https://stackoverflow.com/questions/29930110/calculating-the-averages-for-each-key-in-a-pairwise-k-v-rdd-in-spark-with-pyth – NYCeyes Jan 12 '18 at 01:00

3 Answers3

4

Let's consider the following :

val data = List(("32540b03",-0.00699), ("a93dec11",0.00624),
                ("32cc6532",0.02337) , ("32540b03",0.256023),
                ("32cc6532",-0.03591),("32cc6532",-0.03591))

val rdd = sc.parallelize(data.toSeq).groupByKey().sortByKey()

One way to compute the mean for each pair is the following :

You need to define an average method :

def average[T]( ts: Iterable[T] )( implicit num: Numeric[T] ) = {
   num.toDouble( ts.sum ) / ts.size
}

You can apply your method on the rdd as followed :

val avgs = rdd.map(x => (x._1, average(x._2)))

You can check :

avgs.take(3)

and this is the result :

res4: Array[(String, Double)] = Array((32540b03,0.1245165), (32cc6532,-0.016149999999999998), (a93dec11,0.00624))
eliasah
  • 39,588
  • 11
  • 124
  • 154
1

The officiant way would be to use reduceByKey instead of groupByKey.

val result = sc.parallelize(data)
  .map { case (key, value) => (key, (value, 1)) }
  .reduceByKey { case ((value1, count1), (value2, count2))
    => (value1 + value2, count1 + count2)}
  .mapValues {case (value, count) =>  value.toDouble / count.toDouble}

On the other hand the problem in your solution is that grouped an RDD of objects of the form (String, Iterable[Double]) (just as in an error). You can for example calculate mean of RDD of Ints or doubles but what would be the mean of rdd of pairs.

abalcerek
  • 1,807
  • 1
  • 22
  • 27
  • @ablcerek, would that work if you have a multiple values in key? – E B Feb 12 '17 at 00:26
  • @EB I'm not sure if I understand but if you want calculate it for rdds of elements `(key, value: List[Double])` then you have to change first map to `{ case (key, value) => (key, (value.sum, value.count))}` – abalcerek Feb 12 '17 at 10:48
1

Here is a full program without a custom function:

val conf = new SparkConf().setAppName("means").setMaster("local[*]")
val sc = new SparkContext(conf)

val data = List(("Lily", 23), ("Lily", 50),
                ("Tom", 66), ("Tom", 21), ("Tom", 69),
                ("Max", 11), ("Max", 24))

val RDD = sc.parallelize(data)

val counts = RDD.map(item => (item._1, (1, item._2.toDouble)) )
val countSums = counts.reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2) )
val keyMeans = countSums.mapValues(avgCount => avgCount._2 / avgCount._1)

for ((key, mean) <- keyMeans.collect()) println(key + " " + mean)
Max
  • 71
  • 1
  • 1
  • 6