4

Here is an example receive part of an actor I'm working on:

def receive = {
        case "begin" =>
            val listOfFutures: IndexedSeq[Future[Any]] = workers.map(worker => worker ? Work("test"))
            val future: Future[IndexedSeq[Any]] = Future.sequence(listOfFutures)

            future onComplete {
                case Success(result) => println("Eventual result: "+result)
                case Failure(ex) =>  println("Failure: "+ex.getMessage)
            }
        case msg => println("A message received: "+msg)
    }

When ask fails for one of the workers (in case of a timeout), sequence future completes with failure. However I want to know which worker(s) have failed. Is there a more elegant way rather than simply mapping listOfFutures one by one without using Future.sequence ?

Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
Ömer Faruk Gül
  • 965
  • 1
  • 10
  • 22
  • Instead of using Futures you might also look into just receiving the individual replies and dealing with them. This Actor might also be a good fit for the FSM trait. – Roland Kuhn Apr 14 '15 at 06:28
  • There is also the "Future of Trys" method from [this answer](https://stackoverflow.com/questions/20874186/scala-listfuture-to-futurelist-disregarding-failed-futures#). – vishvAs vAsuki Oct 18 '17 at 04:51

2 Answers2

7

You can use the future's recover method to map or wrap the underlying exception:

import scala.concurrent.{Future, ExecutionContext}

case class WorkerFailed(name: String, cause: Throwable) 
  extends Exception(s"$name - ${cause.getMessage}", cause)

def mark[A](name: String, f: Future[A]): Future[A] = f.recover {
  case ex => throw WorkerFailed(name, ex)
}

import ExecutionContext.Implicits.global

val f = (0 to 10).map(i => mark(s"i = $i", Future { i / i }))
val g = Future.sequence(f)

g.value  // WorkerFailed: i = 0 - / by zero
0__
  • 66,707
  • 21
  • 171
  • 266
2

Thanks to @O__ I have come with another solution that may a better fit some some cases.

case class WorkerDone(name: String)
case class WorkerFailed(name: String)

import ExecutionContext.Implicits.global

val f = (0 to 10).map {
    i => Future {i/i; WorkerDone(s"worker$i")}.recover{
        case ex => WorkerFailed(s"worker$i")
    }
}
val futureSeq = Future.sequence(f)

futureSeq onComplete {
        case Success(list) => list.collect {case result:WorkerFailed => result}.foreach {failed => println("Failed: "+failed.name)}
        case Failure(ex) => println("Exception: "+ex.getMessage)
    }

// just to make sure program doesn't end before onComplete is called.
Thread.sleep(2000L)

I'm not sure that if my example is a good practice, but my aim is to know which workers did fail no matter how did they fail.

Ömer Faruk Gül
  • 965
  • 1
  • 10
  • 22