I have 50,000 tasks and want to execute them with 10 threads. In Java I should create Executers.threadPool(10) and pass runnable to is then wait to process all. Scala as I understand especially useful for that task, but I can't find solution in docs.
-
How are you handling task failures, are the tasks state-dependent, order-dependent and why do you need a fixed thread pool (if you want highest possible parallelism you'd go for available cores)? – Viktor Klang Dec 23 '10 at 13:25
-
Isn't this question related to this one: http://stackoverflow.com/questions/15285284/how-to-configure-a-fine-tuned-thread-pool-for-futures The answer there seems to be a lot simpler? – Mark Butler Jun 16 '14 at 18:51
4 Answers
Scala 2.9.3 and later
THe simplest approach is to use the scala.concurrent.Future
class and associated infrastructure. The scala.concurrent.future
method asynchronously evaluates the block passed to it and immediately returns a Future[A]
representing the asynchronous computation. Futures can be manipulated in a number of non-blocking ways, including mapping, flatMapping, filtering, recovering errors, etc.
For example, here's a sample that creates 10 tasks, where each tasks sleeps an arbitrary amount of time and then returns the square of the value passed to it.
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
val tasks: Seq[Future[Int]] = for (i <- 1 to 10) yield future {
println("Executing task " + i)
Thread.sleep(i * 1000L)
i * i
}
val aggregated: Future[Seq[Int]] = Future.sequence(tasks)
val squares: Seq[Int] = Await.result(aggregated, 15.seconds)
println("Squares: " + squares)
In this example, we first create a sequence of individual asynchronous tasks that, when complete, provide an int. We then use Future.sequence
to combine those async tasks in to a single async task -- swapping the position of the Future
and the Seq
in the type. Finally, we block the current thread for up to 15 seconds while waiting for the result. In the example, we use the global execution context, which is backed by a fork/join thread pool. For non-trivial examples, you probably would want to use an application specific ExecutionContext
.
Generally, blocking should be avoided when at all possible. There are other combinators available on the Future
class that can help program in an asynchronous style, including onSuccess
, onFailure
, and onComplete
.
Also, consider investigating the Akka library, which provides actor-based concurrency for Scala and Java, and interoperates with scala.concurrent
.
Scala 2.9.2 and prior
This simplest approach is to use Scala's Future class, which is a sub-component of the Actors framework. The scala.actors.Futures.future method creates a Future for the block passed to it. You can then use scala.actors.Futures.awaitAll to wait for all tasks to complete.
For example, here's a sample that creates 10 tasks, where each tasks sleeps an arbitrary amount of time and then returns the square of the value passed to it.
import scala.actors.Futures._
val tasks = for (i <- 1 to 10) yield future {
println("Executing task " + i)
Thread.sleep(i * 1000L)
i * i
}
val squares = awaitAll(20000L, tasks: _*)
println("Squares: " + squares)

- 3,855
- 21
- 22
-
2I do not need to execute 10 tasks. I have 50,000 and want to execute them with 10 threads. – yura Dec 22 '10 at 16:32
-
3When using futures, by default tasks are executed in a fork/join scheduler that allocates at most 2 threads for each processor reported by the JVM. The max number of threads can be increased via the actors.corePoolSize system property, or the entire scheduler can be replaced. See the ScalaDoc of Actor for details. – mpilquist Dec 22 '10 at 16:42
-
-
5The \_* syntax handles converting the tasks variable to a var-args call on awaitAll. The awaitAll method takes a var-arg Future[Any] and the tasks variable is an IndexedSeq[Future[Int]]. Adding \_* tells the compiler to expand tasks as varargs. – mpilquist Dec 22 '10 at 18:18
You want to look at either the Scala actors library or Akka. Akka has cleaner syntax, but either will do the trick.
So it sounds like you need to create a pool of actors that know how to process your tasks. An Actor can basically be any class with a receive method - from the Akka tutorial (http://doc.akkasource.org/tutorial-chat-server-scala):
class MyActor extends Actor {
def receive = {
case "test" => println("received test")
case _ => println("received unknown message")
}}
val myActor = Actor.actorOf[MyActor]
myActor.start
You'll want to create a pool of actor instances and fire your tasks to them as messages. Here's a post on Akka actor pooling that may be helpful: http://vasilrem.com/blog/software-development/flexible-load-balancing-with-akka-in-scala/
In your case, one actor per task may be appropriate (actors are extremely lightweight compared to threads so you can have a LOT of them in a single VM), or you might need some more sophisticated load balancing between them.
EDIT: Using the example actor above, sending it a message is as easy as this:
myActor ! "test"
The actor will then output "received test" to standard output.
Messages can be of any type, and when combined with Scala's pattern matching, you have a powerful pattern for building flexible concurrent applications.
In general Akka actors will "do the right thing" in terms of thread sharing, and for the OP's needs, I imagine the defaults are fine. But if you need to, you can set the dispatcher the actor should use to one of several types:
* Thread-based
* Event-based
* Work-stealing
* HawtDispatch-based event-driven
It's trivial to set a dispatcher for an actor:
class MyActor extends Actor {
self.dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher("thread-pool-dispatch")
.withNewThreadPoolWithBoundedBlockingQueue(100)
.setCorePoolSize(10)
.setMaxPoolSize(10)
.setKeepAliveTimeInMillis(10000)
.build
}
See http://doc.akkasource.org/dispatchers-scala
In this way, you could limit the thread pool size, but again, the original use case could probably be satisfied with 50K Akka actor instances using default dispatchers and it would parallelize nicely.
This really only scratches the surface of what Akka can do. It brings a lot of what Erlang offers to the Scala language. Actors can monitor other actors and restart them, creating self-healing applications. Akka also provides Software Transactional Memory and many other features. It's arguably the "killer app" or "killer framework" for Scala.

- 3,285
- 3
- 19
- 24
Here's another answer similar to mpilquist's response but without deprecated API and including the thread settings via a custom ExecutionContext:
import java.util.concurrent.Executors
import scala.concurrent.{ExecutionContext, Await, Future}
import scala.concurrent.duration._
val numJobs = 50000
var numThreads = 10
// customize the execution context to use the specified number of threads
implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(numThreads))
// define the tasks
val tasks = for (i <- 1 to numJobs) yield Future {
// do something more fancy here
i
}
// aggregate and wait for final result
val aggregated = Future.sequence(tasks)
val oneToNSum = Await.result(aggregated, 15.seconds).sum

- 10,634
- 3
- 64
- 63
If you want to "execute them with 10 threads", then use threads. Scala's actor model, which is usually what people is speaking of when they say Scala is good for concurrency, hides such details so you won't see them.
Using actors, or futures with all you have are simple computations, you'd just create 50000 of them and let them run. You might be faced with issues, but they are of a different nature.

- 295,120
- 86
- 501
- 681