10

Suppose I want to use a mutable map in Scala to keep track of the number of times I've seen some strings. In a single-threaded context, this is easy:

import scala.collection.mutable.{ Map => MMap }

class Counter {
  val counts = MMap.empty[String, Int].withDefaultValue(0)

  def add(s: String): Unit = counts(s) += 1
}

Unfortunately this isn't thread-safe, since the get and the update don't happen atomically.

Concurrent maps add a few atomic operations to the mutable map API, but not the one I need, which would look something like this:

def replace(k: A, f: B => B): Option[B]

I know I can use ScalaSTM's TMap:

import scala.concurrent.stm._

class Counter {
  val counts =  TMap.empty[String, Int]

  def add(s: String): Unit = atomic { implicit txn =>
    counts(s) = counts.get(s).getOrElse(0) + 1
  }
}

But (for now) that's still an extra dependency. Other options would include actors (another dependency), synchronization (potentially less efficient), or Java's atomic references (less idiomatic).

In general I'd avoid mutable maps in Scala, but I've occasionally needed this kind of thing, and most recently I've used the STM approach (instead of just crossing my fingers and hoping I don't get bitten by the naïve solution).

I know there are a number of trade-offs here (extra dependencies vs. performance vs. clarity, etc.), but is there anything like a "right" answer to this problem in Scala 2.10?

Community
  • 1
  • 1
Travis Brown
  • 138,631
  • 12
  • 375
  • 680
  • 1
    what about a single Akka actor that writes to the mutable map? `Counter.add` just sends a fire-and-forget message to it. As for reads, depending on your needs they can happen concurrently or go through the actor too. – gourlaysama Aug 09 '13 at 15:28

4 Answers4

10

How about this one? Assuming you don't really need a general replace method right now, just a counter.

import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicInteger

object CountedMap {
  private val counts = new ConcurrentHashMap[String, AtomicInteger]

  def add(key: String): Int = {
    val zero = new AtomicInteger(0)
    val value = Option(counts.putIfAbsent(key, zero)).getOrElse(zero)
    value.incrementAndGet
  }
}

You get better performance than synchronizing on the whole map, and you also get atomic increments.

Ionuț G. Stan
  • 176,118
  • 18
  • 189
  • 202
3

The simplest solution is definitely synchronization. If there is not too much contention, performance might not be that bad.

Otherwise, you could try to roll up your own STM-like replace implementation. Something like this might do:

object ConcurrentMapOps {
  private val rng = new util.Random
  private val MaxReplaceRetryCount = 10
  private val MinReplaceBackoffTime: Long = 1
  private val MaxReplaceBackoffTime: Long = 20
}
implicit class ConcurrentMapOps[A, B]( val m: collection.concurrent.Map[A,B] ) {
  import ConcurrentMapOps._
  private def replaceBackoff() {
    Thread.sleep( (MinReplaceBackoffTime + rng.nextFloat * (MaxReplaceBackoffTime - MinReplaceBackoffTime) ).toLong ) // A bit crude, I know
  }

  def replace(k: A, f: B => B): Option[B] = {
    m.get( k ) match {
      case None => return None
      case Some( old ) =>
        var retryCount = 0
        while ( retryCount <= MaxReplaceRetryCount ) {
          val done = m.replace( k, old, f( old ) )
          if ( done ) {
            return Some( old )
          }
          else {         
            retryCount += 1
            replaceBackoff()
          }
        }
        sys.error("Could not concurrently modify map")
    }
  }
}

Note that collision issues are localized to a given key. If two threads access the same map but work on distinct keys, you'll have no collisions and the replace operation will always succeed the first time. If a collision is detected, we wait a bit (a random amount of time, so as to minimize the likeliness of threads fighting forever for the same key) and try again.

I cannot guarantee that this is production-ready (I just tossed it right now), but that might do the trick.

UPDATE: Of course (as Ionuț G. Stan pointed out), if all you want is increment/decrement a value, java's ConcurrentHashMap already provides thoses operations in a lock-free manner. My above solution applies if you need a more general replace method that would take the transformation function as a parameter.

Régis Jean-Gilles
  • 32,541
  • 5
  • 83
  • 97
  • I noticed in the Map code he switched to ThreadLocalRandom https://github.com/scala/scala/blob/master/src/library/scala/collection/concurrent/TrieMap.scala#L473 – som-snytt Aug 11 '13 at 14:21
2

You're asking for trouble if your map is just sitting there as a val. If it meets your use case, I'd recommend something like

class Counter {
  private[this] myCounts = MMap.empty[String, Int].withDefaultValue(0)
  def counts(s: String) = myCounts.synchronized { myCounts(s) }
  def add(s: String) = myCounts.synchronized { myCounts(s) += 1 }
  def getCounts = myCounts.synchronized { Map[String,Int]() ++ myCounts }
}

for low-contention usage. For high-contention, you should use a concurrent map designed to support such use (e.g. java.util.concurrent.ConcurrentHashMap) and wrap the values in AtomicWhatever.

Rex Kerr
  • 166,841
  • 26
  • 322
  • 407
2

If you are ok to work with future based interface:

trait SingleThreadedExecutionContext {
  val ec = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor())
}

class Counter extends SingleThreadedExecutionContext {
  private val counts = MMap.empty[String, Int].withDefaultValue(0)

  def get(s: String): Future[Int] = future(counts(s))(ec)

  def add(s: String): Future[Unit] = future(counts(s) += 1)(ec)
}

Test will look like:

class MutableMapSpec extends Specification {

  "thread safe" in {

    import ExecutionContext.Implicits.global

    val c = new Counter
    val testData = Seq.fill(16)("1")
    await(Future.traverse(testData)(c.add))
    await(c.get("1")) mustEqual 16
  }
}
Mushtaq Ahmed
  • 6,382
  • 2
  • 18
  • 14
  • This is not thread safe at all. While you guarantee a single writer at a time, you can still have threads reading while the map is being modified – Régis Jean-Gilles Aug 10 '13 at 08:53
  • As I understand, all operations - read, write, mixed - that use ec as context will be thread safe. Ops outside that context will not be thread-safe. Will be glad to hear from others if this understanding is correct. – Mushtaq Ahmed Aug 10 '13 at 11:59
  • But the thing is, the reading is done directly: when you access `c.counts` you are not using the `ExecutionContext` at all. – Régis Jean-Gilles Aug 10 '13 at 12:43
  • c.counts is a mutable map so exposing it to the outside world is not a good idea. I updated the code by adding a get method and making map private. The gist is: all read/write to the mutable map must be guarded by ec. If that is not acceptable, use a concurrent data structure. – Mushtaq Ahmed Aug 10 '13 at 13:03
  • Yes, that was my whole point. This is much better now. Actually if OP can indeed adapt his code to use futures, this is the best solution in terms of ease of use and correctness (though not iintterms of speed). – Régis Jean-Gilles Aug 10 '13 at 14:15
  • I'm not sure I'd want to dedicate a thread to one counter, but maybe if it was doing other drudge work; a get could wait in a long queue. But if hourly rates correlate with SO rep, this is a great deal. Also satisfies the no-dependency requirement, if Akka is a dependency. – som-snytt Aug 11 '13 at 14:29
  • Of course if this is literaly just to perform increments, this is overkill. For something more substantial in terms of computation, that is a nice enough solution. A fully generic solution would actually use a fixed set of one-thread pools (a pool of thread pools) and just make sure that each instance of the map always uses the same one-threa pool. This would prevent the thread count from explosing (as the number of maps grow) while still ensuring that there is no thread-unsafe access. – Régis Jean-Gilles Aug 11 '13 at 16:22
  • @RégisJean-Gilles Yes. I +1 this + yours, which I mashed up to some effect below, for some value of some. – som-snytt Aug 13 '13 at 20:56