14

Consider the following:

import scala.concurrent._
import scala.concurrent.duration.Duration.Inf
import scala.concurrent.ExecutionContext.Implicits.global

def slowInt(i: Int) = { Thread.sleep(200); i }
def slowAdd(x: Int, y: Int) = { Thread.sleep(100); x + y }
def futures = (1 to 20).map(i => future(slowInt(i)))

def timeFuture(fn: => Future[_]) = {
  val t0 = System.currentTimeMillis
  Await.result(fn, Inf)
  println((System.currentTimeMillis - t0) / 1000.0 + "s")
}

both of the following print ~2.5s:

// Use Future.reduce directly (Future.traverse is no different)
timeFuture { Future.reduce(futures)(slowAdd) }

// First wait for all results to come in, convert to Future[List], and then map the List[Int]
timeFuture { Future.sequence(futures).map(_.reduce(slowAdd)) }

As far as I can understand, the reason for this is that Future.reduce/traverse is generic and therefore does not run faster with an associative operator, however, is there an easy way to define a computation where the folding/reducing would start as soon as at least 2 values are available (or 1 in the case of fold), so that while some items in the list are still being generated, the already generated ones are already being computed on?

Erik Kaplun
  • 37,128
  • 15
  • 99
  • 111
  • Actually, https://gist.github.com/viktorklang/4488970. – som-snytt Mar 30 '14 at 05:43
  • OK, got it. Will wait for additional ideas though. – Erik Kaplun Mar 30 '14 at 05:55
  • It seems like you should be able to do this with and [RxScala](http://rxscala.github.io/) [Observable](http://rxscala.github.io/scaladoc/index.html#rx.lang.scala.Observable), but I'm not exactly sure how you would chain them together. – DaoWen Mar 30 '14 at 07:01
  • 1
    Once you've got a `select` or `chooseAny` function it's pretty straightforward to pull two completed items off the list, push a future of their sum back on the list, and recurse (assuming your operation is both associative and commutative). Here's [a quick example](https://gist.github.com/travisbrown/9873125) that uses a `collapse` method I've written before for Scalaz's futures. – Travis Brown Mar 30 '14 at 14:01
  • @TravisBrown: thanks for pointing out that commutativity is also required; and I'll also look into Scalaz `Nondeterminism`. – Erik Kaplun Mar 30 '14 at 15:07
  • @TravisBrown: if you post your comment as an answer, I'll accept it—it seems like the only fastest possible solution. – Erik Kaplun Apr 02 '14 at 00:09
  • @ErikAllik: I've just added an answer—I'm sure it's not the only solution, but it works. – Travis Brown Apr 03 '14 at 21:34
  • surely there are more solutions, but at least it provides a best possible timing solution :) the others were only approximations (but not bad nevertheless). – Erik Kaplun Apr 03 '14 at 23:52

3 Answers3

3

Scalaz has an implementation of futures that includes a chooseAny combinator that takes a collection of futures and returns a future of a tuple of the first completed element and the rest of the futures:

def chooseAny[A](h: Future[A], t: Seq[Future[A]]): Future[(A, Seq[Future[A]])]

Twitter's implementation of futures calls this select. The standard library doesn't include it (but see Viktor Klang's implementation pointed out by Som Snytt above). I'll use Scalaz's version here, but translation should be straightforward.

One approach to getting the operations to run as you wish is to pull two completed items off the list, push a future of their sum back on the list, and recurse (see this gist for a complete working example):

def collapse[A](fs: Seq[Future[A]])(implicit M: Monoid[A]): Future[A] =
  Nondeterminism[Future].chooseAny(fs).fold(Future.now(M.zero))(
    _.flatMap {
      case (hv, tf) =>
        Nondeterminism[Future].chooseAny(tf).fold(Future.now(hv))(
          _.flatMap {
            case (hv2, tf2) => collapse(Future(hv |+| hv2) +: tf2)
          }
        )
    }
  )

In your case you'd call something like this:

timeFuture(
  collapse(futures)(
    Monoid.instance[Int]((a, b) => slowAdd(a, b), 0)
  )
)

This runs in just a touch over 1.6 seconds on my dual core laptop, so it's working as expected (and will continue to do what you want even if the time taken by slowInt varies).

Travis Brown
  • 138,631
  • 12
  • 375
  • 680
1

To get similar timings to you, I had to use a local ExecutionContext like(from here):

implicit val ec = ExecutionContext.fromExecutor(Executors.newCachedThreadPool())

After that, I got better performance by splitting up the list and starting the work on each list by assigning them to vals(based on remembering that futures in a for-comprehenion are handled in order unless they are assigned to vals before the for-comprehenion). Because of the associative nature of the lists, I could then re-combine them with one more call to the same function. I modified the timeFuture function to take a description and print the result of the addition:

def timeFuture(desc: String, fn: => Future[_]) = {
  val t0 = System.currentTimeMillis
  val res = Await.result(fn, Inf)
  println(desc + " = " + res + " in " + (System.currentTimeMillis - t0) / 1000.0 + "s")
}

I'm new to Scala, so I'm still working out re-using the same function at the last step(I think it should be possible) so I cheated and created a helper function:

def futureSlowAdd(x: Int, y: Int) = future(slowAdd(x, y))

Then I could do the following:

timeFuture( "reduce", { Future.reduce(futures)(slowAdd) } )

val right = Future.reduce(futures.take(10))(slowAdd)
val left = Future.reduce(futures.takeRight(10))(slowAdd)
timeFuture( "split futures", (right zip left) flatMap (futureSlowAdd _).tupled)

With that last zip etc from here.

I think this is parallel-izing the work and recombining the results. When I run the those, I get:

reduce = 210 in 2.111s
split futures = 210 in 1.201s

I've used a hard-coded pair of takes, but my idea is the whole list splitting could be put into a function and actually re-use the associative function handed to both right and left branches( with slightly unbalanced trees allowed due to remainders ) at the end.


When I randomize the slowInt() and slowAdd() functions like:

def rand(): Int = Random.nextInt(3)+1
def slowInt(i: Int) = { Thread.sleep(rand()*100); i }
def slowAdd(x: Int, y: Int) = { Thread.sleep(rand()*100); x + y }

I still see "split futures" completing sooner than "reduce". There appears to be some overhead to starting up, that affects the first timeFuture call. Here's a few examples of running them with the startup penalty over "split futures":

split futures = 210 in 2.299s
reduce = 210 in 4.7s

split futures = 210 in 2.594s
reduce = 210 in 3.5s

split futures = 210 in 2.399s
reduce = 210 in 4.401s

On a faster computer than my laptop and using the same ExecutionContext in the question I don't see such large differences(without the randomization in the slow* functions):

split futures = 210 in 2.196s
reduce = 210 in 2.5s

Here it looks like the "split futures" only leads by a little bit.


One last go. Here's a function (aka abomination) that extends the idea I had above:

def splitList[A <: Any]( f: List[Future[A]], assocFn: (A, A) => A): Future[A] = {
    def applyAssocFn( x: Future[A], y: Future[A]): Future[A] = {
      (x zip y) flatMap( { case (a,b) => future(assocFn(a, b)) } )
    }
    def divideAndConquer( right: List[Future[A]], left: List[Future[A]]): Future[A] = {
      (right, left) match {
        case(r::Nil, Nil) => r
        case(Nil, l::Nil) => l
        case(r::Nil, l::Nil) => applyAssocFn( r, l )
        case(r::Nil, l::ls) => {
          val (l_right, l_left) = ls.splitAt(ls.size/2)
          val lret = applyAssocFn( l, divideAndConquer( l_right, l_left ) )
          applyAssocFn( r, lret )
        }
        case(r::rs, l::Nil) => {
          val (r_right, r_left) = rs.splitAt(rs.size/2)
          val rret = applyAssocFn( r, divideAndConquer( r_right, r_left ) )
          applyAssocFn( rret, l )
        }
        case (r::rs, l::ls) => {
          val (r_right, r_left) = rs.splitAt(rs.size/2)
          val (l_right, l_left) = ls.splitAt(ls.size/2)
          val tails = applyAssocFn(divideAndConquer( r_right, r_left ), divideAndConquer( l_right, l_left ))
          val heads = applyAssocFn(r, l)
          applyAssocFn( heads, tails )
        }
      }
    }
    val( right, left ) = f.splitAt(f.size/2)
    divideAndConquer( right, left )
  }

It takes all the pretty out of Scala to split the list up non-tail recursively and assign the futures to values as soon as possible(to start them).

When I test it like:

timeFuture( "splitList", splitList( futures.toList, slowAdd) )

I get the following timings on my laptop using the newCachedThreadPool():

splitList = 210 in 0.805s
split futures = 210 in 1.202s
reduce = 210 in 2.105s

I noticed that the "split futures" timings could be invalid because the futures are started outside of the timeFutures block. However, the splitList function should be called correctly inside the timeFutures function. One take-away for me is the importance of picking an ExecutionContext that's best for the hardware.

Community
  • 1
  • 1
n0741337
  • 2,474
  • 2
  • 15
  • 15
  • I think this suffers from the same limitation I pointed out about Dave Swartz's answer. – Erik Kaplun Mar 31 '14 at 01:57
  • In my tests, the "split futures" always complete faster than the "reduce". That's even if I randomize the `slowInt()` and `slowAdd()` functions. Future.sequence has a for-comprehension and I was trying to start the futures outside of that. In this case, I'm starting half earlier than I think they might have started otherwise. – n0741337 Mar 31 '14 at 16:23
  • I'm not saying it's not faster; try with futures = [400ms, 200ms, 400ms, 200ms] and you should see that it takes 700ms but it could take 600ms—am I wrong? – Erik Kaplun Apr 02 '14 at 00:08
  • Anyway, looks like we aren't getting any more solutions, so +1 at least :) thanks for trying. – Erik Kaplun Apr 02 '14 at 00:11
  • Oh. I think I see how this might be achieved using reactive programming with Akka Actors. I was skipping "knowing when something's done" by brute forcing more to futures to start earlier in `splitList`. However, if 20 slowAdd() actors spun up and told another when it was finished so it could apply the associative function, I think it could be closer to your goal via eventually consistency. But that wouldn't require Futures either... – n0741337 Apr 02 '14 at 15:01
  • I meant to say 20 slowInt() actors which fire off to as-needed slowAdd() actor which then fire back their results to the main, accumulating actor. – n0741337 Apr 02 '14 at 15:13
  • 1
    BTW - `splitList` handles the case futures = [400ms, 200ms, 400ms, 200ms] in ~600ms. With the startup penalty on either ExecutionContext, I get `0.642s` for that versus `0.701s` for the "reduce" case going 2nd. Startup penalty is around 40ms for either starting first. – n0741337 Apr 03 '14 at 14:28
1

The answer below will run in 700ms on a 20 core machine, which given what needs to run in sequence is as well as one can do on any machine with any implementation (20 parallel 200ms slowInt calls followed by 5 nested 100ms slowAdd calls). It runs in 1600ms on my 4 core machine, which is as well as one can do on that machine.

When the slowAdd calls are expanded, with f representing slowAdd:

f(f(f(f(f(x1, x2), f(x3, x4)), f(f(x5, x6), f(x7, x8))), f(f(f(x9, x10), f(x11, x12)), f(f(x13, x14), f(x15, x16)))), f(f(x17, x18), f(x19, x20)))

The example you provided that uses Future.sequence will run in 2100ms on a 20 core machine (20 parallel 200ms slowInt calls followed by 19 nested 100ms slowAdd calls). It runs in 2900ms on my 4 core machine.

When the slowAdd calls are expanded, with f representing slowAdd:

f(f(f(f(f(f(f(f(f(f(f(f(f(f(f(f(f(f(f(x1, x2), x3), x4), x5), x6), x7), x8), x9), x10), x11), x12), x13), x14), x15) x16) x17) x18) x19) x20)

The Future.reduce method calls Future.sequence(futures).map(_ reduceLeft op) so the two examples you provided are equivalent.

My answer uses a combine function that takes a list of futures and an op, a function that combines two futures into one as parameters. The function returns the op applied to all pairs of futures and pairs of pairs and so on until one future remains, which is returned:

def combine[T](list: List[Future[T]], op: (Future[T], Future[T]) => Future[T]): Future[T] =
  if (list.size == 1) list.head
  else if(list.size == 2) list.reduce(op)
  else list.grouped(2).map(combine(_, op)).reduce(op)

Note: I modified your code a bit to match my style preferences.

def slowInt(i: Int): Future[Int] = Future { Thread.sleep(200); i }
def slowAdd(fx: Future[Int], fy: Future[Int]): Future[Int] = fx.flatMap(x => fy.map { y => Thread.sleep(100); x + y })
var futures: List[Future[Int]] = List.range(1, 21).map(slowInt)

The code below uses the combine function for your case:

timeFuture(combine(futures, slowAdd))

The code below updates your Future.sequence example for my modifications:

timeFuture(Future.sequence(futures).map(_.reduce{(x, y) => Thread.sleep(100); x + y }))
Dave Swartz
  • 910
  • 6
  • 14
  • You're assuming all `slowInt` calls take the same time to complete.. how about when the initial list of `slowInt`s takes [200ms, 400ms, 200ms, 400ms] to complete? With the current algorithm, you'll have 2 groups of [200ms, 400ms], so the list still takes 400ms to generate + sequentially followed by 3x `slowAdd` calls => (400+300)ms=700ms, even though instead the two 200ms ones could be `slowAdd`ed first, so by the time the two 400ms ones are done, you'll have 100ms worth of work done, and it's another 2x `slowAdd`s, totalling (400+100+100)ms=600ms—have I missed something? – Erik Kaplun Mar 30 '14 at 22:46
  • Thanks for pointing out that `Future.reduce` uses `Future.sequence` under the hood, by the way—should I have peeked into the source code myself before asking. – Erik Kaplun Mar 30 '14 at 22:50
  • You are correct. If the `slowInt` calls do not take exactly 200ms to complete, then the algorithm can be improved upon. – Dave Swartz Mar 31 '14 at 00:13
  • OK, looks like we aren't getting any more solutions, so +1 at least :) thanks for trying. – Erik Kaplun Apr 02 '14 at 00:10