2

A previous process gave me the accumulator and count of every group in the next way:

val data: Array[(Int, (Double, Int))] = Array((2,(2.1463120403829962,7340)), (1,(1.4532644653720025,4280)))

the structure is (groupId,(acum,count))

now want reduce to get the sum of every tuple:

(k1,(a1,n1)),(k2,(a2,n2))

need:

(a1+a2),(n1+n2)

Sound like a simple task, So do:

val mainMean = groups.reduce((acc,v)=>(acc._1 + v._1,acc._2 + v._2))

And get:

:33: error: type mismatch; found : (Double, Int) required: String val mainMean = groups.reduce((acc,v)=>(acc._1 + v._1,acc._2 + v._2))

Also tried:

val mainMean = groups.reduce((k,(acc,v))=>(acc._1 + v._1,acc._2 + v._2))

and tell me: Note: Tuples cannot be directly destructured in method or function parameters. Either create a single parameter accepting the Tuple2, or consider a pattern matching anonymous function

So:

val mainMean = groups.reduce({case(k,(acc,n))=>(k,(acc._1+n._1,acc._1+n._2))})    

and get

error: type mismatch; found : (Int, (Double, Int)) required: Int

I know it a newbie question but I am stuck on it

Avishek Bhattacharya
  • 6,534
  • 3
  • 34
  • 53
user2232395
  • 461
  • 5
  • 15
  • The previous step was `reduceByKey((acc,v)=>(acc._1 + v._1,acc._2 + v._2))`, now only must to sum the acumulators and the counts of every group, it can be do it in two step like i showed , but i want do do in one. – user2232395 Sep 27 '17 at 14:44

2 Answers2

1

There can be some difficulties working with tuples.
Below you can see working code, but let me explain.

val data = Array((2,(2.1463120403829962,7340)), (1,(1.4532644653720025,4280)))

def tupleSum(t1: (Int, (Double, Int)), t2: (Int, (Double, Int))): (Int, (Double, Int)) =
    (0,(t1._2._1 + t2._2._1, t1._2._2 + t2._2._2))

val mainMean = data.reduce(tupleSum)._2

We can introduce reduce arguments like

data.reduce((tuple1, tuple2) => tupleSum(tuple1, tuple2))

where tuple1 is kind of accumulator. On the first iteration it takes the first value of the array, and every next value adds to the value of accumulator.

So if you want to perform reduce using pattern matching it will look like this:

val mainMean = data.reduce((tuple1, tuple2) => { 
  val t1 = tuple1 match { case (i, t) => t }
  val t2 = tuple2 match { case (i, t) => t }
// now t1 and t2 represents inner tuples of input tuples
  (0, (t1._1 + t2._1, t1._2 + t2._2))}
)


UPD. I rewrite previous listing adding type annotations and println statements. I hope it will help to get the point. And there is some explanation after.

val data = Array((3, (3.0, 3)), (2,(2.0,2)), (1,(1.0,1)))

val mainMean = data.reduce((tuple1: (Int, (Double, Int)),
                             tuple2: (Int, (Double, Int))) => {
    println("tuple1: " + tuple1)
    println("tuple2: " + tuple2)

    val t1: (Double, Int) = tuple1 match {
        case (i: Int, t: (Double, Int)) => t
    }
    val t2: (Double, Int) = tuple2 match {
        case (i: Int, t: (Double, Int)) => t
    }
    // now t1 and t2 represents inner tuples of input tuples
    (0, (t1._1 + t2._1, t1._2 + t2._2))}
)
println("mainMean: " + mainMean)

And the output will be:

tuple1: (3,(3.0,3)) // 1st element of the array
tuple2: (2,(2.0,2)) // 2nd element of the array
tuple1: (0,(5.0,5)) // sum of 1st and 2nd elements
tuple2: (1,(1.0,1)) // 3d element
mainMean: (0,(6.0,6)) // result sum


tuple1 and tuple2 type is (Int, (Double, Int)). We know it always be only this type, that is why we use only one case in pattern matching. We unpack tuple1 to i: Int and t: (Int, Double). As far as we are not interested in key, we return only t. Now t1 is representing the inner tuple of tuple1. The same story with tuple2 andt2.

You can find more information about fold functions here and here

wopqw
  • 26
  • 1
  • 5
  • Thank you @wopqw, is exactly that i wanted, is a problem of tuple unpacking, the solution with function is easy to understand, but the pattern matching is more complicated, can you give more detailed explanation of this. I read some examples of pattern matching anonymous function, but with only one case, which is the sense of t? – user2232395 Sep 28 '17 at 18:44
  • @user2232395, I updated the answer. I hope it become more clear now. Feel free to ask if something is fuzzily. – wopqw Sep 29 '17 at 15:50
0

Try:

val arr = Seq((1, (1,1)), (1, (2,2)), (2, (3,3)))
val rdd = sc.parallelize(arr)

rdd.
    reduceByKey{ case (acc, t) => (acc._1 + t._1, acc._2 + t._2)}.
    collect
// Array((1,(3,3)), (2,(3,3)))
Sergey Bushmanov
  • 23,310
  • 7
  • 53
  • 72