0

I am using FUTURE in scala on play framework. But I have difficulty to get part of the final result in case of timeout when merging multiple futures. Here is what my code does. It has two future to query two providers respectively. And then use a for/yield statement to merge the results. And then await for the result with a time out value. It works fine when two providers reply to the query on time. However, if just one provider reply on time, I know the await will timeout but in this case I still need to retrieve the data returned from the other provider which replied on time. How do I do that?

val pool = Executors.newCachedThreadPool()
implicit val ec = ExecutionContext.fromExecutorService(pool)

var future1 = Future(QueryProvider(provider1, request1))  
var future2 = Future(QueryProvider(provider2, request2))  

val future = for {
    result1 <- future1
    result2 <- future2
 } yield (Merge(result1, result2))

 val duration = Duration(60000, MILLISECONDS)

 try{
     result = Await.result(future, duration).asInstanceOf[string]       
 }

catch{
    case _: Exception => println("time out...") 
    //Here how can I retrieve provider1's result if only provider2 timeout???***
 }
le-doude
  • 3,345
  • 2
  • 25
  • 55
user2512057
  • 51
  • 1
  • 1
  • 7
  • What about using Future.recover() to return an empty "mergeable" result on timeout? http://www.scala-lang.org/api/current/index.html#scala.concurrent.Future – le-doude Dec 27 '13 at 07:56
  • You can nearly always avoid using Await - treat the use of Await as a code smell. – Christopher Hunt Dec 27 '13 at 20:11

1 Answers1

4

You could use after from akka instead of blocking Await.result:

val timeout =
  akka.pattern.after(FiniteDuration(60000, MILLISECONDS), using = system.scheduler){
    Future.successful(None)
  }

val resFuture = for {
    result1 <- Future firstCompletedOf Seq(future1.map{Some(_)}, timeout)
    result2 <- Future firstCompletedOf Seq(future2.map{Some(_)}, timeout)
} yield (result1, result2)

val result = resFuture map {
  case (Some(r1), Some(r2)) => Merge(r1, r2)
  case (Some(r1), None) => PartialResult1(r1)
  case (None, Some(r2)) => PartialResult2(r2)
  case _ => EmptyResult
}

In this case resFuture will be completed in 60 seconds and you can process partial result. Also you don't need Await in Play - you could use Async.

In case you have many equivalent futures of the same type you could use it like this:

val futures: Seq[Future[Int]] = ???
val futureWithTimeout =
  futures.map{ f => Future firstCompletedOf Seq(f.map{Some(_)}, timeout) }

val result: Future[Seq[Option[Int]]] = Future.sequence(futureWithTimeout)

// In case you need to know index of completed or not completed future
val indexedResults = result.zipWithIndex

// In case you just need only completed results
val completedResults: Future[Seq[Option[Int]]] = result.map{_.flatten}

Types here are only for illustration.

senia
  • 37,745
  • 4
  • 88
  • 129
  • Given the use of Play you have Akka on the classpath so the above example is great. Where there is no Akka you need an alternate scheduler. This resource is helpful there: http://stackoverflow.com/questions/16304471/scala-futures-built-in-timeout – Christopher Hunt Dec 27 '13 at 20:11
  • Yeah, that helps. I am trying to try it out. I am having trouble to use it. The compiler tells me Not Found System for using = system.scheduler. I should import what? I cannot find a centralized doc which can tell me which package to import when using it's function. – user2512057 Dec 27 '13 at 21:14
  • @user2512057: [`play.api.libs.concurrent.Akka.system`](http://www.playframework.com/documentation/2.2.x/api/scala/index.html#play.api.libs.concurrent.Akka$). See [Integrating with Akka](http://www.playframework.com/documentation/2.2.x/ScalaAkka) in Play documentation. – senia Dec 27 '13 at 21:20
  • Get some progress. Now I can execute PartialResult1() in case of future2 timeout. But as you suggested, I want to replace Await with Aync by follwing the official example the link you provided above. However, I always get an error:"value async is not a member of object play.api.mvc.Action" with following code: def index() = Action.async {.....}. I imported all package required by example. – user2512057 Dec 28 '13 at 05:12
  • @user2512057: what is your `Play` version? In older versions it should be `Action { Async { ... } }`. See combobox `Browse versions` in `Play` documentation. – senia Dec 28 '13 at 05:29
  • Yeah, mine is an older version 2.1.x. Sorry I am new and didn't know the Browse version in Play document. Eventually, my issue is resolved. Thanks user2512057!!! Just one more thing: I have more than 15 futures actually(I just showed 2 as example). If I write it as below, it is a really long combination. Is there any better way to do it? case (Some(r1), Some(r2), ......Some(r14), Some(r15)) => case (Some(r1), Some(r2), ......Some(r14), None) => – user2512057 Dec 29 '13 at 06:11
  • @user2512057: `Thanks user2512057!!!`... `user2512057` is you - I have to mention your current nick for you to get a notification. See update in my answer. – senia Dec 29 '13 at 08:49
  • @Senia: Thanks Senia! I was out for several days and didn't get time to try it out. Just one question before I try out: How do I get Seq[Option[Int]] from Future[Seq[Option[Int]]], so that I can use it. – user2512057 Jan 02 '14 at 04:12
  • @user2512057: It depends on how you want to use it. In general - just use `map`: `val futureResult = for {seq <- futureSeq} yield {use seq here}`. – senia Jan 02 '14 at 06:15
  • Here is what I need: Receive a request from client and do some calculation and return. In the last sentence, I need to send back a list of Int to client. Async{ val futures: Seq[Future[Int]] = ??? val futureWithTimeout = futures.map{ f => Future firstCompletedOf Seq(f.map{Some(_)}, timeout) } val result: Future[Seq[Option[Int]]] = Future.sequence(futureWithTimeout) val completedResults: Future[Seq[Option[Int]]] = result.map{_.flatten} Ok(views.html.test(/*here I need input parameter List[Int]*/)) } – user2512057 Jan 03 '14 at 04:39
  • Sorry didn't get time to close this thread. Yes, above works. Only one thing is we need to take out 'option': val completedResults: Future[Seq[Option[Int]]] = result.map{_.flatten} – user2512057 Feb 17 '14 at 02:06