To get similar timings to you, I had to use a local ExecutionContext like(from here):
implicit val ec = ExecutionContext.fromExecutor(Executors.newCachedThreadPool())
After that, I got better performance by splitting up the list and starting the work on each list by assigning them to vals(based on remembering that futures in a for-comprehenion are handled in order unless they are assigned to vals before the for-comprehenion). Because of the associative nature of the lists, I could then re-combine them with one more call to the same function. I modified the timeFuture
function to take a description and print the result of the addition:
def timeFuture(desc: String, fn: => Future[_]) = {
val t0 = System.currentTimeMillis
val res = Await.result(fn, Inf)
println(desc + " = " + res + " in " + (System.currentTimeMillis - t0) / 1000.0 + "s")
}
I'm new to Scala, so I'm still working out re-using the same function at the last step(I think it should be possible) so I cheated and created a helper function:
def futureSlowAdd(x: Int, y: Int) = future(slowAdd(x, y))
Then I could do the following:
timeFuture( "reduce", { Future.reduce(futures)(slowAdd) } )
val right = Future.reduce(futures.take(10))(slowAdd)
val left = Future.reduce(futures.takeRight(10))(slowAdd)
timeFuture( "split futures", (right zip left) flatMap (futureSlowAdd _).tupled)
With that last zip etc from here.
I think this is parallel-izing the work and recombining the results. When I run the those, I get:
reduce = 210 in 2.111s
split futures = 210 in 1.201s
I've used a hard-coded pair of takes, but my idea is the whole list splitting could be put into a function and actually re-use the associative function handed to both right and left branches( with slightly unbalanced trees allowed due to remainders ) at the end.
When I randomize the slowInt()
and slowAdd()
functions like:
def rand(): Int = Random.nextInt(3)+1
def slowInt(i: Int) = { Thread.sleep(rand()*100); i }
def slowAdd(x: Int, y: Int) = { Thread.sleep(rand()*100); x + y }
I still see "split futures" completing sooner than "reduce". There appears to be some overhead to starting up, that affects the first timeFuture
call. Here's a few examples of running them with the startup penalty over "split futures":
split futures = 210 in 2.299s
reduce = 210 in 4.7s
split futures = 210 in 2.594s
reduce = 210 in 3.5s
split futures = 210 in 2.399s
reduce = 210 in 4.401s
On a faster computer than my laptop and using the same ExecutionContext in the question I don't see such large differences(without the randomization in the slow* functions):
split futures = 210 in 2.196s
reduce = 210 in 2.5s
Here it looks like the "split futures" only leads by a little bit.
One last go. Here's a function (aka abomination) that extends the idea I had above:
def splitList[A <: Any]( f: List[Future[A]], assocFn: (A, A) => A): Future[A] = {
def applyAssocFn( x: Future[A], y: Future[A]): Future[A] = {
(x zip y) flatMap( { case (a,b) => future(assocFn(a, b)) } )
}
def divideAndConquer( right: List[Future[A]], left: List[Future[A]]): Future[A] = {
(right, left) match {
case(r::Nil, Nil) => r
case(Nil, l::Nil) => l
case(r::Nil, l::Nil) => applyAssocFn( r, l )
case(r::Nil, l::ls) => {
val (l_right, l_left) = ls.splitAt(ls.size/2)
val lret = applyAssocFn( l, divideAndConquer( l_right, l_left ) )
applyAssocFn( r, lret )
}
case(r::rs, l::Nil) => {
val (r_right, r_left) = rs.splitAt(rs.size/2)
val rret = applyAssocFn( r, divideAndConquer( r_right, r_left ) )
applyAssocFn( rret, l )
}
case (r::rs, l::ls) => {
val (r_right, r_left) = rs.splitAt(rs.size/2)
val (l_right, l_left) = ls.splitAt(ls.size/2)
val tails = applyAssocFn(divideAndConquer( r_right, r_left ), divideAndConquer( l_right, l_left ))
val heads = applyAssocFn(r, l)
applyAssocFn( heads, tails )
}
}
}
val( right, left ) = f.splitAt(f.size/2)
divideAndConquer( right, left )
}
It takes all the pretty out of Scala to split the list up non-tail recursively and assign the futures to values as soon as possible(to start them).
When I test it like:
timeFuture( "splitList", splitList( futures.toList, slowAdd) )
I get the following timings on my laptop using the newCachedThreadPool()
:
splitList = 210 in 0.805s
split futures = 210 in 1.202s
reduce = 210 in 2.105s
I noticed that the "split futures" timings could be invalid because the futures are started outside of the timeFutures
block. However, the splitList
function should be called correctly inside the timeFutures
function. One take-away for me is the importance of picking an ExecutionContext that's best for the hardware.