12

I cannot fund if there is way to limit number of unprocessed Futures in Scala. For example in following code:

import ExecutionContext.Implicits.global    
for (i <- 1 to N) {
  val f = Future {
    //Some Work with bunch of object creation
  }
}

if N is too big, it will eventually throw OOM. Is there a way to limit number of unprocessed Futures ether with queue-like wait or with exception?

yurybubnov
  • 357
  • 2
  • 11
  • 2
    This is true in general and is not limited to working with just futures. If you're creating/retaining a large number of objects of any type then you'll eventually run out of memory. It's your responsibility to limit this as appropriate. – JimN Jun 11 '16 at 01:05

1 Answers1

11

So, the simplest answer is that you can create an ExecutionContext that blocks or throttles the execution of new tasks beyond a certain limit. See this blog post. For a more fleshed out example of a blocking Java ExecutorService, here is an example. [You can use it directly if you want, the library on Maven Central is here.] This wraps some nonblocking ExecutorService, which you can create using the factory methods of java.util.concurrent.Executors.

To convert a Java ExecutorService into a Scala ExecutionContext is just ExecutionContext.fromExecutorService( executorService ). So, using the library linked above, you might have code like...

import java.util.concurrent.{ExecutionContext,Executors}
import com.mchange.v3.concurrent.BoundedExecutorService

val executorService = new BoundedExecutorService(
  Executors.newFixedThreadPool( 10 ), // a pool of ten Threads
  100,                                // block new tasks when 100 are in process
  50                                  // restart accepting tasks when the number of in-process tasks falls below 50
 )

implicit val executionContext = ExecutionContext.fromExecutorService( executorService )

// do stuff that creates lots of futures here...

That's fine if you want a bounded ExecutorService that will last as long as your whole application. But if you are creating lots of futures in a localized point in your code, and you will want to shut down the ExecutorService when you are done with it. I define loan-pattern methods in Scala [maven central] that both create the context and shut it down after I'm done. The code ends up looking like...

import com.mchange.sc.v2.concurrent.ExecutionContexts

ExecutionContexts.withBoundedFixedThreadPool( size = 10, blockBound = 100, restartBeneath = 50 ) { implicit executionContext =>
    // do stuff that creates lots of futures here...

    // make sure the Futures have completed before the scope ends!
    // that's important! otherwise, some Futures will never get to run
}

Rather than using an ExecutorService, that blocks outright, you can use an instance that slows things down by forcing the task-scheduling (Future-creating) Thread to execute the task rather than running it asynchronously. You'd make a java.util.concurrent.ThreadPoolExecutor using ThreadPoolExecutor.CallerRunsPolicy. But ThreadPoolExecutor is fairly complex to build directly.

A newer, sexier, more Scala-centric alternative to all of this would be to check out Akka Streams as an alternative to Future for concurrent execution with "back-pressure" to prevent OutOfMemoryErrors.

Steve Waldman
  • 13,689
  • 1
  • 35
  • 45