0

I am a newbie to Scala. I have a general query on future concepts of Scala.

Say I have a list of elements and foreach element present in the list i have to invoke a method which does some processing.

We can use future method and can do our processing in parallel but my question is how can we control that concurrent processing tasks running in parallel/background.

For example I should maintain the parallel running task limit as 10. So at Max my future should spawn processing for 10 elements in the list and wait for any of the spawned process to complete. Once free slots available it should spawn the process for remaining elements till it reach max.

I searched in Google but could not able to find it. In Unix same can be done by running process in background and manually check count using ps command. Since not aware of Scala much. Please help me in this.

Thanks in advance.

ungalVicky
  • 53
  • 1
  • 9
  • 6
    That part is managed by the [**ExecutionContext**](https://www.scala-lang.org/api/current/scala/concurrent/ExecutionContext.html) you may create one with your desired size using a [**FixedThreadPool**](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executors.html#newFixedThreadPool-int-). - BTW, if you want to collect all the results as a single **Future** of a **List** of values, consider using `Future.traverse(list)(function)`. – Luis Miguel Mejía Suárez Dec 28 '19 at 03:21
  • @LuisMiguelMejíaSuárez lets say those futures are doing non-blocking IO using some other thread pool, then this "future parallelism control" will not workout as we are controlling the "execution parallelism" and not the "task parallelism". – sarveshseri Dec 28 '19 at 18:54
  • @SarveshKumarSingh I do not follow what you mean, care to explain with an example? – Luis Miguel Mejía Suárez Dec 28 '19 at 18:58
  • a simple `Future(10).map(i => i + 100)(executor = otherExecutionContext).map(i => i + 10)`. That middle map with other execution context. – sarveshseri Dec 28 '19 at 19:08
  • @SarveshKumarSingh Yeah, so? Sorry I still do not understand what is your point. – Luis Miguel Mejía Suárez Dec 28 '19 at 19:13
  • Our "task" consists of 3 steps. Now, the parallelism of second step is can not be controlled by our thread-pool, which in-turn means that the parallelism of our tasks is not controlled by our thread-pool. – sarveshseri Dec 28 '19 at 19:20
  • Ah I get it, well yes. But if you make that other execution context implicit then all steps will have the same execution. In any case, **Future** doesn't really provide a way to control that. – Luis Miguel Mejía Suárez Dec 28 '19 at 19:26

1 Answers1

6

Let us create two thread pools of different sizes:

val fiveThreadsEc = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(5))
val tenThreadsEc = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))

We can control on which thread pool will future run by passing it as an argument to the future like so

Future(42)(tenThreadsEc)

This is equivalent to

Future.apply(body = 42)(executor = tenThreadsEc)

which corresponds to the signature of Future.apply

def apply[T](body: => T)(implicit executor: ExecutionContext): Future[T] =

Note how the executor parameter is declared as implicit. This means we could provide it implicitly like so

implicit val tenThreadsEc = ...
Future(42) // executor = tenThreadsEc argument passed in magically

Now, as per Luis' suggestion, consider simplified signature of Future.traverse

def traverse[A, B, M[X] <: IterableOnce[X]](in: M[A])(fn: A => Future[B])(implicit ..., executor: ExecutionContext): Future[M[B]]

Let us simplify it further by fixing M type constructor parameter to, say a M = List,

def traverse[A, B]
  (in: List[A])                          // list of things to process in parallel
  (fn: A => Future[B])                   // function to process an element asynchronously
  (implicit executor: ExecutionContext)  // thread pool to use for parallel processing
: Future[List[B]]                        // returned result is a future of list of things instead of list of future things

Let's pass in the arguments

val tenThreadsEc = ...
val myList: List[Int] = List(11, 42, -1)
def myFun(x: Int)(implicit executor: ExecutionContext): Future[Int] = Future(x + 1)(ec)

Future.traverse[Int, Int, List](
  in       = myList)(
  fn       = myFun(_)(executor = tenThreadsEc))(
  executor = tenThreadsEc,
  bf       = implicitly               // ignore this
)

Relying on implicit resolution and type inference, we have simply

implicit val tenThreadsEc = ... 
Future.traverse(myList)(myFun)

Putting it all together, here is a working example

import java.util.concurrent.Executors
import scala.concurrent.{ExecutionContext, Future}

object FuturesExample extends App {
  val fiveThreadsEc = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(5))
  val tenThreadsEc = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))

  val myList: List[Int] = List(11, 42, -1)
  def myFun(x: Int)(implicit executor: ExecutionContext): Future[Int] = Future(x + 1)(executor)

  Future(body = 42)(executor = fiveThreadsEc)
    .andThen(v => println(v))(executor = fiveThreadsEc)

  Future.traverse[Int, Int, List](
    in = myList)(
    fn = myFun(_)(executor = tenThreadsEc))(
    executor = tenThreadsEc,
    bf = implicitly
  ).andThen(v => println(v))(executor = tenThreadsEc)

  // Using implicit execution context call-site simplifies to...
  implicit val ec = tenThreadsEc

  Future(42)
    .andThen(v => println(v))

  Future.traverse(myList)(myFun)
    .andThen(v => println(v))
}

which outputs

Success(42)
Success(List(12, 43, 0))
Success(42)
Success(List(12, 43, 0))

Alternatively, Scala provides default execution context called

scala.concurrent.ExecutionContext.Implicits.global

and we can control its parallelism with system properties

scala.concurrent.context.minThreads
scala.concurrent.context.numThreads
scala.concurrent.context.maxThreads
scala.concurrent.context.maxExtraThreads

For example, create the following ConfiguringGlobalExecutorParallelism.scala

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future

object ConfiguringGlobalExecutorParallelism extends App {
  println(scala.concurrent.ExecutionContext.Implicits.global.toString)

  Future.traverse(List(11,42,-1))(x => Future(x + 1))
    .andThen(v => println(v))
}

and run it with

scala -Dscala.concurrent.context.numThreads=10 -Dscala.concurrent.context.maxThreads=10 ConfiguringGlobalExecutorParallelism.scala

which should output

scala.concurrent.impl.ExecutionContextImpl$$anon$3@cb191ca[Running, parallelism = 10, size = 0, active = 0, running = 0, steals = 0, tasks = 0, submissions = 0]
Success(List(12, 43, 0))

Note how parallelism = 10.

Another option is to use parallel collections

libraryDependencies += "org.scala-lang.modules" %% "scala-parallel-collections" % "0.2.0"

and configure parallelism via tasksupport, for example

val myParVector: ParVector[Int] = ParVector(11, 42, -1)
myParVector.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool(10))
myParVector.map(x => x + 1)

Note that parallel collections are a separate facility from Futures

parallel collection design in Scala has no notion of an ExecutionContext, that is strictly a property of Future. The parallel collection library has a notion of a TaskSupport which is responsible for scheduling inside the parallel collection

so we could map over the collection simply with x => x + 1 instead of x => Future(x + 1), and there was no need to use Future.traverse, instead just a regular map was sufficient.

Mario Galic
  • 47,285
  • 6
  • 56
  • 98