With java I can create an ExecutorCompletionService with an executor and a bunch of tasks. This class arranges that submitted tasks are, upon completion, placed on a queue accessible using take. https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorCompletionService.html Does Akka has something similar for managing Futures returned by actors?
2 Answers
This answer is for Scala only. In scala there is sequence
/firstCompletedOf
to compose futures, which returns you new future completing after all
/one
of the underlying futures isCompleted (which is equivalent to examples from CompletionService
's api docs). Such solution is more safe than ecs.take().get()
as there is no blocking if you use onComplete
listener; however, if you still want some blocking waiter - use Await.result
. So, no need for CompletionService
as list of futures is flexible enough and much more safe. Equivalent of first example:
val solvers: List[() => Int] = ...
val futures = solvers.map(s => Future {s()}) //run execution
(Future sequence futures) onComplete { results: Seq[Int] =>
results.map(use)
}
Another example is cancelling the task:
val solvers: List[Future => Int] = ... //some list of functions(tasks), Future is used to check if task was interrupted
val (futures, cancels): solvers.map(cancellableFuture) //see https://stackoverflow.com/questions/16020964/cancellation-with-future-and-promise-in-scala
(Future firstCompletedOf futures) onComplete { result: Int =>
cancels.foreach(_())
use(result)
}
Talking about Java, Akka has adaptation of scala's futures: http://doc.akka.io/docs/akka/snapshot/java/futures.html
If you just want to sequentially process results on their completion, you may use actor for that:
val futures: List[Future]
futures.map(_ pipeTo actor) //actor's mailbox is used as queue
To model completion queue's behavior (which is not recommended):
import scala.concurrent._
import duration._
import scala.concurrent.ExecutionContext.Implicits.global //some execution context
class Queue[T](solvers: Seq[() => T]) extends Iterator[T]{
case class Result(f: Future[Result], r: T)
var futures: Set[Future[Result]] = solvers map {s =>
lazy val f: Future[Result] = Future{Result(f, s())}
f
} toSet
def hasNext() = futures.nonEmpty
def next() = {
val result = Await.result((Future firstCompletedOf futures.toSeq), Duration.Inf)
futures -= result.f
result.r
}
}
scala> val q = new Queue(List(() => 1, () => 2, () => 3, () => 4))
q: Queue[Int] = non-empty iterator
scala> q.next
res14: Int = 2
scala> q.next
res15: Int = 1
scala> q.foreach(println)
4
3
Maybe this probable solution without using ExecutorCompletionService will help you:
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent._
import scala.concurrent.duration._
import scala.util._
import scala.concurrent.{ExecutionContextExecutorService, ExecutionContext, Future}
class BatchedIteratorsFactory[S,R](M: Int, timeout: Duration) {
implicit val ec = ExecutionContext.fromExecutor(Executors.newCachedThreadPool())
val throttlingQueue = new LinkedBlockingQueue[Future[R]](M) // Can't put more than M elements to the queue
val resultQueue = new LinkedBlockingQueue[Try[R]](M)
val jobCounter = new AtomicLong(0)
def iterator(input: Iterator[S])(job: S => R): Iterator[Try[R]] = {
val totalWork = Future(input.foreach { elem =>
jobCounter.incrementAndGet
throttlingQueue.put(Future { job(elem) } andThen {
case r => resultQueue.put(r); throttlingQueue.poll() // the order is important here!
})
})
new Iterator[Try[R]] {
override def hasNext: Boolean = jobCounter.get != 0 || input.hasNext
override def next(): Try[R] = {
jobCounter.decrementAndGet
Option(resultQueue.poll(timeout.toMillis, TimeUnit.MILLISECONDS)).getOrElse(
throw new TimeoutException(s"No task has been completed within ${timeout.toMillis} ms!")
)
}
}
}
}
So you can use it like this:
val job = { (elem: Int) =>
val result = elem * elem
Thread.sleep(1000L) // some possibel computation...
result
}
val src = Range(1, 16).toIterator
val it = new BatchedIteratorsFactory[Int, Int](M = 3, timeout = 4 seconds)
.iterator(src)(job)

- 61
- 1
- 5