Let us create two thread pools of different sizes:
val fiveThreadsEc = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(5))
val tenThreadsEc = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))
We can control on which thread pool will future run by passing it as an argument to the future like so
Future(42)(tenThreadsEc)
This is equivalent to
Future.apply(body = 42)(executor = tenThreadsEc)
which corresponds to the signature of Future.apply
def apply[T](body: => T)(implicit executor: ExecutionContext): Future[T] =
Note how the executor
parameter is declared as implicit
. This means we could provide it implicitly like so
implicit val tenThreadsEc = ...
Future(42) // executor = tenThreadsEc argument passed in magically
Now, as per Luis' suggestion, consider simplified signature of Future.traverse
def traverse[A, B, M[X] <: IterableOnce[X]](in: M[A])(fn: A => Future[B])(implicit ..., executor: ExecutionContext): Future[M[B]]
Let us simplify it further by fixing M
type constructor parameter to, say a M = List
,
def traverse[A, B]
(in: List[A]) // list of things to process in parallel
(fn: A => Future[B]) // function to process an element asynchronously
(implicit executor: ExecutionContext) // thread pool to use for parallel processing
: Future[List[B]] // returned result is a future of list of things instead of list of future things
Let's pass in the arguments
val tenThreadsEc = ...
val myList: List[Int] = List(11, 42, -1)
def myFun(x: Int)(implicit executor: ExecutionContext): Future[Int] = Future(x + 1)(ec)
Future.traverse[Int, Int, List](
in = myList)(
fn = myFun(_)(executor = tenThreadsEc))(
executor = tenThreadsEc,
bf = implicitly // ignore this
)
Relying on implicit resolution and type inference, we have simply
implicit val tenThreadsEc = ...
Future.traverse(myList)(myFun)
Putting it all together, here is a working example
import java.util.concurrent.Executors
import scala.concurrent.{ExecutionContext, Future}
object FuturesExample extends App {
val fiveThreadsEc = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(5))
val tenThreadsEc = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))
val myList: List[Int] = List(11, 42, -1)
def myFun(x: Int)(implicit executor: ExecutionContext): Future[Int] = Future(x + 1)(executor)
Future(body = 42)(executor = fiveThreadsEc)
.andThen(v => println(v))(executor = fiveThreadsEc)
Future.traverse[Int, Int, List](
in = myList)(
fn = myFun(_)(executor = tenThreadsEc))(
executor = tenThreadsEc,
bf = implicitly
).andThen(v => println(v))(executor = tenThreadsEc)
// Using implicit execution context call-site simplifies to...
implicit val ec = tenThreadsEc
Future(42)
.andThen(v => println(v))
Future.traverse(myList)(myFun)
.andThen(v => println(v))
}
which outputs
Success(42)
Success(List(12, 43, 0))
Success(42)
Success(List(12, 43, 0))
Alternatively, Scala provides default execution context called
scala.concurrent.ExecutionContext.Implicits.global
and we can control its parallelism with system properties
scala.concurrent.context.minThreads
scala.concurrent.context.numThreads
scala.concurrent.context.maxThreads
scala.concurrent.context.maxExtraThreads
For example, create the following ConfiguringGlobalExecutorParallelism.scala
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
object ConfiguringGlobalExecutorParallelism extends App {
println(scala.concurrent.ExecutionContext.Implicits.global.toString)
Future.traverse(List(11,42,-1))(x => Future(x + 1))
.andThen(v => println(v))
}
and run it with
scala -Dscala.concurrent.context.numThreads=10 -Dscala.concurrent.context.maxThreads=10 ConfiguringGlobalExecutorParallelism.scala
which should output
scala.concurrent.impl.ExecutionContextImpl$$anon$3@cb191ca[Running, parallelism = 10, size = 0, active = 0, running = 0, steals = 0, tasks = 0, submissions = 0]
Success(List(12, 43, 0))
Note how parallelism = 10
.
Another option is to use parallel collections
libraryDependencies += "org.scala-lang.modules" %% "scala-parallel-collections" % "0.2.0"
and configure parallelism via tasksupport
, for example
val myParVector: ParVector[Int] = ParVector(11, 42, -1)
myParVector.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool(10))
myParVector.map(x => x + 1)
Note that parallel collections are a separate facility from Futures
parallel collection design in Scala has no notion of an
ExecutionContext
, that is strictly a property of Future
. The parallel
collection library has a notion of a TaskSupport
which is responsible
for scheduling inside the parallel collection
so we could map over the collection simply with x => x + 1
instead of x => Future(x + 1)
, and there was no need to use Future.traverse
, instead just a regular map was sufficient.