5

I want to add an after(d: FiniteDuration)(callback: => Unit) util to Scala Futures that would enable me to do this:

val f = Future(someTask)

f.after(30.seconds) {
  println("f has not completed in 30 seconds!")
}

f.after(60.seconds) {
  println("f has not completed in 60 seconds!")
}

How can I do this?

pathikrit
  • 32,469
  • 37
  • 142
  • 221

4 Answers4

1

Usually I use a thread pool executor and promises:

import scala.concurrent.duration._
import java.util.concurrent.{Executors, ScheduledThreadPoolExecutor}
import scala.concurrent.{Future, Promise}

val f: Future[Int] = ???

val executor = new ScheduledThreadPoolExecutor(2, Executors.defaultThreadFactory(), AbortPolicy)

def withDelay[T](operation: ⇒ T)(by: FiniteDuration): Future[T] = {
  val promise = Promise[T]()
  executor.schedule(new Runnable {
    override def run() = {
      promise.complete(Try(operation))
    }
  }, by.length, by.unit)
  promise.future
}

Future.firstCompletedOf(Seq(f, withDelay(println("still going"))(30 seconds)))
Future.firstCompletedOf(Seq(f, withDelay(println("still still going"))(60 seconds)))
Ende Neu
  • 15,581
  • 5
  • 57
  • 68
0

One way is to use Future.firstCompletedOf (see this blogpost):

val timeoutFuture = Future { Thread.sleep(500); throw new TimeoutException }

val f = Future.firstCompletedOf(List(f, timeoutFuture))
f.map { case e: TimeoutException => println("f has not completed in 0.5 seconds!") }

where TimeoutException is some exception or type.

Andy Hayden
  • 359,921
  • 101
  • 625
  • 535
  • But `firstCompletedOf` does not cancel the other future if the first one returns. So if most of my futures last few milliseconds but I want to add a debug statement for after 30s, I will be creating a lot of Thread.sleep(30000) which won't be cancelled right? – pathikrit Jun 09 '16 at 23:01
  • @pathikrit yes, but the result will be thrown away. If it's a non-blocking future (e.g. `val timeoutFuture = akka.pattern.after(500.milliseconds, using = system.scheduler) { ... }` from the blog post, then I don't think it's not a problem (it's not blocking a thread). – Andy Hayden Jun 09 '16 at 23:33
0

Use import akka.pattern.after. If you want to implement it without akka here is the source code. The other (java) example is TimeoutFuture in com.google.common.util.concurrent.

Nikita
  • 4,435
  • 3
  • 24
  • 44
0

Something like this, perhaps:

  object PimpMyFuture {
     implicit class PimpedFuture[T](val f: Future[T]) extends AnyVal {
        def after(delay: FiniteDuration)(callback: => Unit): Future[T] = {
           Future {
             blocking { Await.ready(f, delay) }
           } recover { case _: TimeoutException => callback }
           f
        }
     } 
  }

  import PimpMyFuture._
  Future { Thread.sleep(10000); println ("Done") }
    .after(5.seconds) { println("Still going") }

This implementation is simple, but it basically doubles the number of threads you need - each active future effectively occupies two threads - which is a bit wasteful. Alternatively, you could use scheduled tasks to make your waits non-blocking. I don't know of a "standard" scheduler in scala (each lib has their own), but for a simple task like this you can use java's TimerTask directly:

object PimpMyFutureNonBlocking {    
 val timer = new java.util.Timer

 implicit class PimpedFuture[T](val f: Future[T]) extends AnyVal {
    def after(delay: FiniteDuration)(callback: => Unit): Future[T] = {
       val task = new java.util.TimerTask {
          def run() { if(!f.isCompleted) callback }
       }
       timer.schedule(task, delay.toMillis)
       f.onComplete { _ => task.cancel }
       f
    }
  } 
}
Dima
  • 39,570
  • 6
  • 44
  • 70