5

I have a List of Map:

val list = List(
  Map("id" -> "A", "value" -> 20, "name" -> "a"),
  Map("id" -> "B", "value" -> 10, "name" -> "b"),
  Map("id" -> "A", "value" -> 5, "name" -> "a"),
  Map("id" -> "C", "value" -> 1, "name" -> "c"),
  Map("id" -> "D", "value" -> 60, "name" -> "d"),
  Map("id" -> "C", "value" -> 3, "name" -> "c")
)

I want to sum the value and group them by id value in the most efficient way so it becomes:

Map(A -> 25, B -> 10, C -> 4, D -> 60)
null
  • 8,669
  • 16
  • 68
  • 98
  • Will the Maps be always just the two elements? And why use a Map when the values are of two different types? It means unpleasant casts as you can see from the answers. Bit of a design smell here... – The Archetypal Paul Jan 30 '15 at 12:14
  • Which one? The `val list` or the expected result? The map in `val list` can have other key/value. I will update my example. Also, this question has nothing to do with real world app design. – null Jan 30 '15 at 12:29
  • Thanks. But I do think the awkardness of a Map of String -> Any is worth a comment.... – The Archetypal Paul Jan 30 '15 at 12:43
  • I aware that. Please consider it as a question's challenge :) – null Jan 30 '15 at 12:58

4 Answers4

7

A) This one is most readable and performant if you have many items with same id:

scala> list.groupBy(_("id")).mapValues(_.map(_("value").asInstanceOf[Int]).sum)
res14: scala.collection.immutable.Map[Any,Int] = Map(D -> 60, A -> 25, C -> 4, B -> 10)

You can also use list.groupBy(_("id")).par... as well. It will work faster only if you have many elements with same key, otherwise it will be extrimely slow.

Otherwise, changing thread's context itself will make .par version slower, as map(_"value").sum (your nested map-reduce) may be faster than switching between threads. If N = count of cores in the system, your map-reduce should be N times slower to benefit from par, roughly speaking of course.

B) So, if parallelizing didn't work so well (it's better to check that with performance tests) you can just "reimplement" groupBy in specialized way:

val m = scala.collection.mutable.Map[String, Int]() withDefaultValue(0)
for (e <- list; k = e("id").toString) m.update(k, m(k) + e("value").asInstanceOf[Int])

C) The most parallelized option is:

val m = new scala.collection.concurrent.TrieMap[String, Int]()
for (e <- list.par; k = e("id").toString) {
    def replace = {           
       val v = m(k)
       m.replace(k, v, v + e("value").asInstanceOf[Int]) //atomic
    }
    m.putIfAbsent(k, 0) //atomic
    while(!replace){} //in case of conflict
}

scala> m
res42: scala.collection.concurrent.TrieMap[String,Int] = TrieMap(B -> 10, C -> 4, D -> 60, A -> 25)

D) The most parallelized in functional style (slower as merging maps every time, but best for distributed map-reduce without shared memory), using scalaz semigroups:

import scalaz._; import Scalaz._
scala> list.map(x => Map(x("id").asInstanceOf[String] -> x("value").asInstanceOf[Int]))
    .par.reduce(_ |+| _)
res3: scala.collection.immutable.Map[String,Int] = Map(C -> 4, D -> 60, A -> 25, B -> 10)

But it will be more performant only if you use some more complex aggregation than "+".


So let's do simple performance testing:

def time[T](n: Int)(f: => T) = {
  val start = System.currentTimeMillis()
  for(i <- 1 to n) f
  (System.currentTimeMillis() - start).toDouble / n
}

It's done in Scala 2.12 REPL with JDK8 on MacBook Pro 2.3 GHz Intel Core i7. Every test launched two times - first to warm-up the JVM.

1) For your input collection and time(100000){...}, from slowest to fastest:

`par.groupBy.par.mapValues` = 0.13861 ms
`groupBy.par.mapValues` = 0.07667 ms
`most parallelized` = 0.06184 ms    
`scalaz par.reduce(_ |+| _)` = 0.04010 ms //same for other reduce-based implementations, mentioned here
`groupBy.mapValues` = 0.00212 ms
`for` + `update` with mutable map initialization time = 0.00201 ms
`scalaz suml` = 0.00171 ms      
`foldLeft` from another answer = 0.00114 ms
`for` + `update` without mutable map initialization time = 0.00105

So, foldLeft from another answer seems to be the best solution for your input.

2) Let's make it bigger

 scala> val newlist = (1 to 1000).map(_ => list).reduce(_ ++ _)

Now with newList as input and time(1000){...}:

 `scalaz par.reduce(_ |+| _)` = 1.422 ms
 `foldLeft`/`for` = 0.418 ms
 `groupBy.par.mapValues` = 0.343 ms

And it's better to choose groupBy.par.mapValues here.

3) Finally, let's define another aggregation:

scala> implicit class RichInt(i: Int){ def ++ (i2: Int) = { Thread.sleep(1); i + i2}}
defined class RichInt

And test it with list and time(1000):

`foldLeft` = 7.742 ms
`most parallelized` = 3.315 ms

So it's better to use most parallelized version here.


Why reduce is so slow:

Let's take 8 elements. It produces a calculation tree from leafs [1] + ... + [1] to root [1 + ... + 1]:

time(([1] + [1]) + ([1] + [1]) + ([1] + [1]) + ([1] + [1]) 
   => ([1 +1] + [1 +1]) + ([1 + 1] + [1 + 1]) 
   => [1 + 1 + 1 + 1] + [1 + 1 + 1 + 1]) 
 = (1 + 1 + 1 + 1) +  (2 + 2) + 4 = 12

time(N = 8) = 8/2 + 2*8/4 + 4*8/8 = 8 * (1/2 + 2/4 + 4/8) = 8 * log2(8)/ 2 = 12

Or just:

time(N) = N * log2(N)/2

Of course this formula works only for numbers that are actually powers of 2. Anyway, the complexity is O(NlogN), which is slower than foldLeft's O(N). Even after parallelization it becomes just O(N) so this implementation can be used only for Big Data's distributed Map-Reduce, or simply saying when you have no enough memory and are storing your Map in some cache.

You may notice that it's parallelizing better than other options for your input - that's just because for 6 elements it's not so slow (almost O(1) here) - and you do only one reduce call - when other options are grouping data before or just creating more threads, which leads to more "thread switching" overhead. Simply saying, reduce creates less threads here. But if you have more data - it doesn't work of course (see experiment 2).

Community
  • 1
  • 1
dk14
  • 22,206
  • 4
  • 51
  • 88
  • I want to say "you again" but it will sound cliche :). Anyway, which one is better between `list.par.groupBy(_("id")).mapValues(_.map(_("value").asInstanceOf[Int]).sum)` and `list.groupBy(_("id")).par.mapValues(_.map(_("value").asInstanceOf[Int]).sum)`? – null Jan 30 '15 at 13:26
  • shortly, it depends on your collection - so it's better to run performance tests to choose appropriate solution. – dk14 Jan 30 '15 at 14:00
  • As always, your answer always bring something new to me :). Just a bit suggestion: don't use `map` for variable name e.g. `val map`, it can be mistaken with map method. – null Jan 30 '15 at 15:06
  • Ok. What would you say about one-letter naming :) http://www.cse.unsw.edu.au/~cs3161/14s2/StyleGuide.html (I know, Scala isn't Haskell) – dk14 Jan 30 '15 at 16:34
  • I'm fine with that :) – null Jan 30 '15 at 17:11
  • Awesome update :). So this one is the badass? `for + update without mutable map initialization = 0.00105`, but where is the code? – null Jan 30 '15 at 18:48
  • this (version from my answer with `for (e <- list; `...) is most efficient for your input as long as `foldLeft` from another answer. But other versions are better for bigger inputs. – dk14 Jan 30 '15 at 18:56
  • I'm confused at `without mutable map initialization`, does it mean don't use mutable map and being initialized, or just don't initialize it with `withDefaultValue`? – null Jan 30 '15 at 19:11
  • it means that i didn't count the time for `val m = mutable.Map` here, as long as i excluded the time for `.par` itself in `without parallell collection init `. If you have big collection - they will be less significant – dk14 Jan 30 '15 at 19:14
  • Ah, I see. Btw was the `list.par.groupBy` version included in the test? – null Jan 30 '15 at 19:22
  • I called it `groupBy.par.mapValues` – dk14 Jan 30 '15 at 19:23
  • it's the best choice if you have a collection, where one id has many many elements to sum (see option 2 in testsing) – dk14 Jan 30 '15 at 19:25
  • So `list.groupBy.par` and `list.par.groupBy` are roughly equal in performance? Also, is it possible to do with `reduceLeft`? I think I heard somewhere that reduce is better in performance than fold. – null Jan 30 '15 at 19:29
  • Oops sorry, misread - added `list.par.groupBy` it's equal `list.par.groupBy.par` but twice slower than `list.groupBy.par` (for your input). – dk14 Jan 30 '15 at 19:39
  • as far as I can see [here](https://github.com/scala/scala/blob/2.11.x/src/library/scala/collection/TraversableOnce.scala) - no big difference. But `reduce` itself should be faster for parallell collections. It's possible to implement it with reduce, but you will have to merge maps for that - I've added example with scalaz semigroups, but it's not so fast. – dk14 Jan 30 '15 at 20:25
  • * `reduce` is parallelizable, `reduceLeft` - isn't – dk14 Jan 30 '15 at 20:34
  • That's interesting, can you show me how it looks like if using `reduce`? Also, how could reduceLeft isn't parallelizable? Looking from the source code, reduce invoke reduceLeft. – null Jan 30 '15 at 20:50
  • 1
    it's not source code for parallell collection - just try `List(1,2,3,4,5,6,7,8).par.reduce((a, b) => {println(a + " " + b); a + b})` (execute it several times) - and compare them. I've already updated the answer with reduce example (see, D). reduce is working with semigroups, which means associativity of some "+" operation, which means order doesn't matter here. reduceLeft is executing operations in the same order as elements are present inside list. – dk14 Jan 30 '15 at 20:53
  • What does |+| mean? Is it possible to solve with plain `reduce` without using scalaz? (just curious here, sorry for troubling) – null Jan 30 '15 at 21:27
  • yes - you have to implement merging two maps with summing elements with same keys by yourself then – dk14 Jan 30 '15 at 21:43
  • about |+| - see the link attached to "semigroups" word in my answer, you'll find some implementations there – dk14 Jan 30 '15 at 21:43
  • So the map merging would make the process slower? – null Jan 31 '15 at 09:03
  • Could you explain how `[1 + 1 + 1 + 1] + [1 + 1 + 1 + 1])` can turn into `(1 + 1 + 1 + 1) + (2 + 2) + 4`? (sorry for prolonging the comment chats, but i'm so curious here) – null Jan 31 '15 at 20:35
  • anyway, the right answer to your question was `foldLeft` (i'd prefer to remove reduce/reduceLeft from it as they are redundand). I've just demonstrated that parallelizing won't work for your small collection, and not much usable even for bigger collections, as "+" operation is pretty fast. – dk14 Jan 31 '15 at 20:53
  • pay attention to closing parenthesis - `[1 + 1 + 1 + 1] + [1 + 1 + 1 + 1]` is turning into `4` as you need 4 operations to merge maps with 4 elements each. `=>` means piping results from one level to another - i'm not good at pseudographics - just imagine the tree from leafs `[1] + ... + [1]` to root `[1 + ... + 1]`. – dk14 Jan 31 '15 at 21:04
  • Ah, I see. I actually planned to let the community choose the answer (I'll award whoever getting highest votest by 2-3 votes diff) since foldleft and your answer are spot on for small data & big data, but if you say so... Btw, the `"+" operations is pretty fast`, did you refer to the `for + update` code? – null Feb 01 '15 at 07:35
  • I refer just to simple + from 2 + 2 = 4)) – dk14 Feb 01 '15 at 07:36
  • Sorry - have no time - you can use my `time` function free - it's not proprietary :) – dk14 Feb 01 '15 at 07:40
  • Ok, sorry for troubling. Thanks for the answer. Really appreciate your contribution :) – null Feb 01 '15 at 07:50
6

Also using foldLeft:

list.foldLeft(Map[String, Int]().withDefaultValue(0))((res, v) => {
  val key = v("id").toString
  res + (key -> (res(key) + v("value").asInstanceOf[Int]))
})

UPDATE: with reduceLeft:

(Map[String, Any]().withDefaultValue(0) :: list).reduceLeft((res, v) => {
  val key = v("id").toString
  res + (key -> (res(key).asInstanceOf[Int] + v("value").asInstanceOf[Int]))
})

By the way if you look at reduceLeft definition you'll see that it uses the same foldLeft:

  def reduceLeft[B >: A](f: (B, A) => B): B =
    if (isEmpty) throw new UnsupportedOperationException("empty.reduceLeft")
    else tail.foldLeft[B](head)(f)

UPDATE 2: with par and reduce: The problem here is to distinguish result Map value from initial Map value. I chose contains("id").

list.par.reduce((a, b) => {
  def toResultMap(m: Map[String, Any]) =
    if (m.contains("id"))
      Map(m("id").toString -> m("value")).withDefaultValue(0)
    else m
  val aM = toResultMap(a)
  val bM = toResultMap(b)
  aM.foldLeft(bM)((res, v) =>
    res + (v._1 -> (res(v._1).asInstanceOf[Int] + v._2.asInstanceOf[Int])))
})
user5102379
  • 1,492
  • 9
  • 9
  • the performance is same as in scalaz's `|+|` and `list.map(x => Map(x("id").asInstanceOf[String] -> x("value").asInstanceOf[Int])).par.reduce((map1,map2) => map1 ++ map2.map{ case (k,v) => k -> (v + map1.getOrElse(k,0)) })` from [this](http://stackoverflow.com/a/7080321/1809978) answer – dk14 Jan 30 '15 at 22:02
  • it's 40 times slower than your `foldLeft` – dk14 Jan 30 '15 at 22:11
3

I don't know about "most efficient", but the nicest way I can think of is using scalaz suml, which uses Monoid; the Monoid for Map does exactly what you want. The only ugly part is turning those Map[String, Any]s into something more well-typed and representing the structure we want (e.g. Map("A" → 20)).

import scalaz._, Scalaz._
list.map{m => 
  Map(m("id").asInstanceOf[String] → m("value").asInstanceOf[Int])
}.suml
lmm
  • 17,386
  • 3
  • 26
  • 37
1

Starting Scala 2.13, you can use the groupMapReduce method which is (as its name suggests) an equivalent of a groupBy followed by mapValues and a reduce step:

// val list = List(Map("id" -> "A", "value" -> 20, "name" -> "a"), Map("id" -> "B", "value" -> 10, "name" -> "b"), Map("id" -> "A", "value" -> 5, "name" -> "a"), Map("id" -> "C", "value" -> 1, "name" -> "c"), Map("id" -> "D", "value" -> 60, "name" -> "d"), Map("id" -> "C", "value" -> 3, "name" -> "c"))
list.groupMapReduce(_("id"))(_("value").asInstanceOf[Int])(_ + _)
// Map("A" -> 25, "B" -> 10, "C" -> 4, "D" -> 60)

This:

  • groups Maps by their "id" field (_("id")) (group part of groupMapReduce)

  • maps each grouped Map to their "value" field typed back to Int (_("value").asInstanceOf[Int]) (map part of groupMapReduce)

  • reduces values within each group (_ + _) by summing them (reduce part of groupMapReduce).

This is a one-pass version of what can be translated by:

list.groupBy(_("id")).mapValues(_.map(_("value").asInstanceOf[Int]).reduce(_ + _)).toMap
Xavier Guihot
  • 54,987
  • 21
  • 291
  • 190