1

What I want to do is to start out using some implementation of a Map and accumulate data into it by iterating over a parallel collection. Keys can "overlap" between threads as the keys are probabilistically generated (related to random number generation).

Ex. Thread 1 wants to add key = A value = 1 to the map. If it already exists, add 1 to the existing value (since value is 1) - if not, create the mapping. Meanwhile, another thread has key = A and value = 2, and wants to do the same thing.

Is there a way to do this without creating an entire Actor system?

ConcurrentHashMap from Java's library seems to look interesting, but the "weakly consistent" iterators are bothering me regarding safety of upating the map across threads..

adelbertc
  • 7,270
  • 11
  • 47
  • 70
  • There are other synchronization methods .. and it really comes down to what the desired results/semantics are. –  Jul 16 '12 at 18:24
  • wrt ConcurrentHashMap iterators, see: http://stackoverflow.com/questions/3768554/is-iterating-concurrenthashmap-values-thread-safe If these semantics are not "correct", what is? It is similar to choosing between READ COMMITTED, which allows non-repeatable (including phantom) reads, and SERIALIZABLE in a DB transaction .. which is "correct"? –  Jul 16 '12 at 18:32

1 Answers1

7

That's a very trivial thing to do without Actors.

class CountMap[K <: AnyRef](mapSize: Int = 16) extends ConcurrentHashMap[K, AtomicLong](mapSize) {
  def addCount(key: K): Long = (get(key) match { // Check value for key
    case null =>  // If not mapped yet
      val al = new AtomicLong(0) // Create a new memory slot to keep the count in that is thread safe
      putIfAbsent(key, al) match { // Try to put our memory slot in atomically
        case null => al // If we succeeded then our memory slot should be used
        case some => some // if there already was a memory slot, use that one
      }
    case some => some // If there already was a memory slot, use that one
    }).incrementAndGet() // increment and get the current value of the slot associated with the given key

  def getCount(key: K): Long = get(key) match { // get the memory slot associated with the key
    case null => 0L // if none, say it's 0
    case some => some.get() // if some get its value
  }
}
Viktor Klang
  • 26,479
  • 7
  • 51
  • 68
  • Interesting - I assume I would just create an instance of this and use this to mutate values across threads? – adelbertc Jul 16 '12 at 19:22
  • I disagree that this is 'very trivial'. It requires detailed knowledge of the inner workings of ConcurrentHashMap. There are probably many more incorrect (bug-ridden) ways to extend such a class than there are correct ways. – Rick-777 Jul 18 '12 at 07:44
  • Rick-777: It requires 0 detailed knowledge of the inner workings of CHM. All you need to know is that ConcurrentMap/AtomicLong exists and that you can read JavaDoc. – Viktor Klang Jul 18 '12 at 09:14