0

I have a RDD[(String,Map[String,Int])],

   [("A",Map("acs"->2,"sdv"->2,"sfd"->1),("B",Map("ass"->2,"fvv"->2,"ffd"->1)),("A"),Map("acs"->2,"sdv"->2,"sfd"->1)]

I want to merge the elements with same key as,

    [("A",Map("acs"->4,"sdv"->4,"sfd"->2),("B",Map("ass"->2,"fvv"->2,"ffd"->1))]

How to do this is in scala?

Rasika
  • 387
  • 6
  • 19

2 Answers2

3

If you define mapSum (see merge two maps and sum values):

def mapSum[T](map1: Map[T, Int], map2: Map[T, Int]): Map[T, Int] = map1 ++ map2.map{ case (k,v) => k -> (v + map1.getOrElse(k,0)) }

Then you can groupBy and reduce (similar to your other question):

@ rdd.groupBy(_._1).map(_._2.reduce((a, b) => (a._1, mapSum(a._2, b._2)))).collect
res11: Array[(String, Map[String, Int])] = Array(
  ("A", Map("acs" -> 4, "sdv" -> 4, "sfd" -> 2)),
  ("B", Map("ass" -> 2, "fvv" -> 2, "ffd" -> 1))
)
Andy Hayden
  • 359,921
  • 101
  • 625
  • 535
2

An efficient approach would be to use reduceByKey to aggregate the Map (in the accumulator) by summing the values of matched keys:

val rdd = sc.parallelize(Seq(
  ("A", Map("acs"->2, "sdv"->2, "sfd"->1)),
  ("B", Map("ass"->2, "fvv"->2, "ffd"->1)),
  ("A", Map("acs"->2, "sdv"->2, "sfd"->1))
))

rdd.reduceByKey( (acc, m) =>
  acc ++ m.map{ case (k, v) => (k, acc.getOrElse(k, 0) + v) }
).collect

// res1: Array[(String, scala.collection.immutable.Map[String,Int])] = Array(
//   (A,Map(acs -> 4, sdv -> 4, sfd -> 2)),
//   (B,Map(ass -> 2, fvv -> 2, ffd -> 1))
// )
Leo C
  • 22,006
  • 3
  • 26
  • 39