0

My core question is: how can I implement synchronization in a method on the combination of the object instance and the method parameter?

Here are the details of my situation. I'm using the following code to implement memoization, adapted from this answer:

/**
 * Memoizes a unary function
 * @param f the function to memoize
 * @tparam T the argument type
 * @tparam R the result type
 */
class Memoized[-T, +R](f: T => R) extends (T => R) {

  import scala.collection.mutable

  private[this] val cache = mutable.Map.empty[T, R]

  def apply(x: T): R = cache.getOrElse(x, {
    val y = f(x)
    cache += ((x, y))
    y
  })
}

In my project, I'm memoizing Futures to deduplicate asynchronous API calls. This worked fine when using for...yield to map over the resulting futures, created with the standard ExcecutionContext, but when I upgraded to Scala Async for nicer handling of these futures. However, I realized that the multithreading that library uses allowed multiple threads to enter apply, defeating memoization, because the async blocks all executed in parallel, entering the "orElse" thunk before cache could be updated with a new Future.

To work around this, I put the main apply function in a this.synchronized block:

def apply(x: T): R = this.synchronized {
  cache.getOrElse(x, {
    val y = f(x)
    cache += ((x, y))
    y
  })
}

This restored the memoized behavior. The drawback is that this will block calls with different params, at least until the Future is created. I'm wondering if there is a way to set up finer grained synchronization on the combination of the Memoized instance and the value of the x parameter to apply. That way, only calls that would be deduplicated will be blocked.

As a side note, I'm not sure this is truly performance critical, because the synchronized block will release once the Future is created and returned (I think?). But if there are any concerns with this that I'm not thinking of, I would also like to know.

Community
  • 1
  • 1
acjay
  • 34,571
  • 6
  • 57
  • 100
  • Have you considered using an actor? Then client code would never block on responses, and only a single thread will touch `cache`. – wingedsubmariner Jun 20 '14 at 12:44
  • I hadn't thought of that, but based on what little I know, that sounds like a more idiomatic solution. Could you explain more or provide a link to how this works for consuming an API from within the controller code for a web response? – acjay Jun 20 '14 at 12:51
  • Well, per my understanding, multithreading cannot help much for establishing memoization: http://stackoverflow.com/a/20462893/2073130 – lcn Dec 15 '14 at 00:25

1 Answers1

1

Akka actors combined with futures provide a powerful way to wrap over mutable state without blocking. Here is a simple example of how to use an Actor for memoization:

import akka.actor._
import akka.util.Timeout
import akka.pattern.ask
import scala.concurrent._
import scala.concurrent.duration._

class Memoize(system: ActorSystem) {
  class CacheActor(f: Any => Future[Any]) extends Actor {
    private[this] val cache = scala.collection.mutable.Map.empty[Any, Future[Any]]

    def receive = {
      case x => sender ! cache.getOrElseUpdate(x, f(x))
    }
  }

  def apply[K, V](f: K => Future[V]): K => Future[V] = {
    val fCast = f.asInstanceOf[Any => Future[Any]]
    val actorRef = system.actorOf(Props(new CacheActor(fCast)))
    implicit val timeout = Timeout(5.seconds)
    import system.dispatcher
    x => actorRef.ask(x).asInstanceOf[Future[Future[V]]].flatMap(identity)
  }
}

We can use it like:

val system = ActorSystem()
val memoize = new Memoize(system)
val f = memoize { x: Int =>
  println("Computing for " + x)
  scala.concurrent.Future.successful {
    Thread.sleep(1000)
    x + 1
  }
}
import system.dispatcher
f(5).foreach(println)
f(5).foreach(println)

And "Computing for 5" will only print a single time, but "6" will print twice.

There are some scary looking asInstanceOf calls, but it is perfectly type-safe.

wingedsubmariner
  • 13,350
  • 1
  • 27
  • 52