2

With java I can create an ExecutorCompletionService with an executor and a bunch of tasks. This class arranges that submitted tasks are, upon completion, placed on a queue accessible using take. https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorCompletionService.html Does Akka has something similar for managing Futures returned by actors?

theseadroid
  • 471
  • 5
  • 19

2 Answers2

3

This answer is for Scala only. In scala there is sequence/firstCompletedOf to compose futures, which returns you new future completing after all/one of the underlying futures isCompleted (which is equivalent to examples from CompletionService's api docs). Such solution is more safe than ecs.take().get() as there is no blocking if you use onComplete listener; however, if you still want some blocking waiter - use Await.result. So, no need for CompletionService as list of futures is flexible enough and much more safe. Equivalent of first example:

  val solvers: List[() => Int] = ...
  val futures = solvers.map(s => Future {s()}) //run execution
  (Future sequence futures) onComplete { results: Seq[Int] =>
      results.map(use)
  }

Another example is cancelling the task:

  val solvers: List[Future => Int] = ... //some list of functions(tasks), Future is used to check if task was interrupted
  val (futures, cancels): solvers.map(cancellableFuture) //see https://stackoverflow.com/questions/16020964/cancellation-with-future-and-promise-in-scala

  (Future firstCompletedOf futures) onComplete { result: Int =>
        cancels.foreach(_())
        use(result)
  }

Talking about Java, Akka has adaptation of scala's futures: http://doc.akka.io/docs/akka/snapshot/java/futures.html

If you just want to sequentially process results on their completion, you may use actor for that:

  val futures: List[Future]
  futures.map(_ pipeTo actor) //actor's mailbox is used as queue

To model completion queue's behavior (which is not recommended):

  import scala.concurrent._
  import duration._
  import scala.concurrent.ExecutionContext.Implicits.global //some execution context

  class Queue[T](solvers: Seq[() => T]) extends Iterator[T]{
     case class Result(f: Future[Result], r: T)
     var futures: Set[Future[Result]] = solvers map {s => 
        lazy val f: Future[Result]  = Future{Result(f, s())}
        f
     } toSet

     def hasNext() = futures.nonEmpty        

     def next() = {         
        val result = Await.result((Future firstCompletedOf futures.toSeq), Duration.Inf)
        futures -= result.f
        result.r
     }
  }

scala> val q = new Queue(List(() => 1, () => 2, () => 3, () => 4))
q: Queue[Int] = non-empty iterator

scala> q.next
res14: Int = 2

scala> q.next
res15: Int = 1

scala> q.foreach(println)
4
3
Community
  • 1
  • 1
dk14
  • 22,206
  • 4
  • 51
  • 88
0

Maybe this probable solution without using ExecutorCompletionService will help you:

import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent._
import scala.concurrent.duration._
import scala.util._
import scala.concurrent.{ExecutionContextExecutorService, ExecutionContext, Future}

class BatchedIteratorsFactory[S,R](M: Int, timeout: Duration) {
  implicit val ec = ExecutionContext.fromExecutor(Executors.newCachedThreadPool())

  val throttlingQueue = new LinkedBlockingQueue[Future[R]](M) // Can't put more than M elements to the queue
  val resultQueue = new LinkedBlockingQueue[Try[R]](M)
  val jobCounter = new AtomicLong(0)

  def iterator(input: Iterator[S])(job: S => R): Iterator[Try[R]] = {
    val totalWork = Future(input.foreach { elem =>
      jobCounter.incrementAndGet
      throttlingQueue.put(Future { job(elem) } andThen {
        case r => resultQueue.put(r); throttlingQueue.poll() // the order is important here!
      })
    })

    new Iterator[Try[R]] {
      override def hasNext: Boolean = jobCounter.get != 0 || input.hasNext
      override def next(): Try[R] = {
        jobCounter.decrementAndGet
        Option(resultQueue.poll(timeout.toMillis, TimeUnit.MILLISECONDS)).getOrElse(
          throw new TimeoutException(s"No task has been completed within ${timeout.toMillis} ms!")
        )
      }
    }
  }
}

So you can use it like this:

val job = { (elem: Int) =>
  val result = elem * elem
  Thread.sleep(1000L) // some possibel computation...
  result
}

val src = Range(1, 16).toIterator

val it = new BatchedIteratorsFactory[Int, Int](M = 3, timeout = 4 seconds)
  .iterator(src)(job)