66

there is an aspect of futures that I do not exactly understand from the official tutorial ref. http://docs.scala-lang.org/overviews/core/futures.html

Do futures in scala have a built in time-out mechanism of some kind? Let's say the example below was a 5 gigabyte text file... does the implied scope of "Implicits.global" eventually cause onFailure to fire in a non-blocking way or can that be defined? And without a default time-out of some kind, wouldn't that imply it's possible neither success nor failure would ever fire?

import scala.concurrent._
import ExecutionContext.Implicits.global

val firstOccurence: Future[Int] = future {
  val source = scala.io.Source.fromFile("myText.txt")
  source.toSeq.indexOfSlice("myKeyword")
}
firstOccurence onSuccess {
  case idx => println("The keyword first appears at position: " + idx)
}
firstOccurence onFailure {
  case t => println("Could not process file: " + t.getMessage)
}
paradigmatic
  • 40,153
  • 18
  • 88
  • 147
LaloInDublin
  • 5,379
  • 4
  • 22
  • 26
  • See [awaitAll](http://www.scala-lang.org/api/current/index.html#scala.actors.Futures%24) – Nikita Volkov Apr 30 '13 at 16:22
  • 4
    Bear in mind that none of these solutions will actually stop the `Future` from running. The only place you can stop a `Future` is from inside it. – Raul Apr 13 '15 at 15:40
  • 1
    @NikitaVolkov Your link no longer works. Tried to find the correct link but failed. – akauppi Dec 28 '16 at 12:03

14 Answers14

72

You only get timeout behavior when you use blocking to get the results of the Future. If you want to use the non-blocking callbacks onComplete, onSuccess or onFailure, then you would have to roll your own timeout handling. Akka has built in timeout handling for request/response (?) messaging between actors, but not sure if you want to start using Akka. FWIW, in Akka, for timeout handling, they compose two Futures together via Future.firstCompletedOf, one which represents the actual async task and one that represents the timeout. If the timeout timer (via a HashedWheelTimer) pops first, you get a failure on the async callback.

A very simplified example of rolling your own might go something like this. First, an object for scheduling timeouts:

import org.jboss.netty.util.{HashedWheelTimer, TimerTask, Timeout}
import java.util.concurrent.TimeUnit
import scala.concurrent.duration.Duration
import scala.concurrent.Promise
import java.util.concurrent.TimeoutException

object TimeoutScheduler{
  val timer = new HashedWheelTimer(10, TimeUnit.MILLISECONDS)
  def scheduleTimeout(promise:Promise[_], after:Duration) = {
    timer.newTimeout(new TimerTask{
      def run(timeout:Timeout){              
        promise.failure(new TimeoutException("Operation timed out after " + after.toMillis + " millis"))        
      }
    }, after.toNanos, TimeUnit.NANOSECONDS)
  }
}

Then a function to take a Future and add timeout behavior to it:

import scala.concurrent.{Future, ExecutionContext, Promise}
import scala.concurrent.duration.Duration

def withTimeout[T](fut:Future[T])(implicit ec:ExecutionContext, after:Duration) = {
  val prom = Promise[T]()
  val timeout = TimeoutScheduler.scheduleTimeout(prom, after)
  val combinedFut = Future.firstCompletedOf(List(fut, prom.future))
  fut onComplete{case result => timeout.cancel()}
  combinedFut
}

Note that the HashedWheelTimer I am using here is from Netty.

cmbaxter
  • 35,283
  • 4
  • 86
  • 95
  • 3
    Thank you very much! can you offer an general advice in how to handle futures (after the fact). I am reading up on Akka, and various HTTP packages for Scala that use futures. It seems that at some point in order to use a Future, a blocking event has to occur at that moment or abandon the process.. but many tutorials seem to focus on the non-blocking call rather than doing anything practical with it after the fact? – LaloInDublin Apr 30 '13 at 20:16
  • You can absolutely build logic around the non-blocking use of Futures, and I suggest leaning in that direction as it performs considerably better. For instance, we use Unfiltered for our HTTP/REST layer. Calls come in and go to Akka Actors for servicing. We use non-blocking `onComplete` on the Future returned from the call to the actor and then complete the Unfiltered Netty async HTTP request. That's just one example (albeit not very detailed) of how to use the non-blocking callbacks for something real. – cmbaxter Apr 30 '13 at 21:17
  • @cmbaxter - thanks for this. I've Tried to implement this (BTW - ant licensing issues here) but it doesn't seem to work. Trying to use with linked futures using map on the first future. 1. Can I have 2 timeouts? How? 2. I tried calling something like this but it seems to do nothing: TimeoutScheduler.withTimeout(createVMFuture)(global,2 seconds).recover{ case ex:TimeoutException => { logError("create vm timed out") } – YaOg Oct 15 '13 at 12:29
  • @cmbaxter Thanks for your `TimeoutScheduler`. I utilized in http://stackoverflow.com/questions/21983288/sequencing-futures-with-timeout but having the problem of non-termination. Maybe you have a second... – nemron Feb 24 '14 at 12:22
  • All this time I was under the impression that `Future` instances always have a hard timeout set, so I prefer `Actor` when I want to transfer very large data. Many thanks for the enlightenment. – lolski Feb 19 '16 at 04:09
  • 3
    If using Akka, it has a handy `akka.pattern.after` that can be used e.g. for timeouts: http://doc.akka.io/docs/akka/2.4/scala/futures.html#After – akauppi Dec 28 '16 at 12:13
26

All of these answers require additional dependencies. I decided to write a version using java.util.Timer which is an efficient way to run a function in the future, in this case to trigger a timeout.

Blog post with more details here

Using this with Scala's Promise, we can make a Future with timeout as follows:

package justinhj.concurrency

import java.util.concurrent.TimeoutException
import java.util.{Timer, TimerTask}

import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.language.postfixOps

object FutureUtil {

  // All Future's that use futureWithTimeout will use the same Timer object
  // it is thread safe and scales to thousands of active timers
  // The true parameter ensures that timeout timers are daemon threads and do not stop
  // the program from shutting down

  val timer: Timer = new Timer(true)

  /**
    * Returns the result of the provided future within the given time or a timeout exception, whichever is first
    * This uses Java Timer which runs a single thread to handle all futureWithTimeouts and does not block like a
    * Thread.sleep would
    * @param future Caller passes a future to execute
    * @param timeout Time before we return a Timeout exception instead of future's outcome
    * @return Future[T]
    */
  def futureWithTimeout[T](future : Future[T], timeout : FiniteDuration)(implicit ec: ExecutionContext): Future[T] = {

    // Promise will be fulfilled with either the callers Future or the timer task if it times out
    val p = Promise[T]

    // and a Timer task to handle timing out

    val timerTask = new TimerTask() {
      def run() : Unit = {
            p.tryFailure(new TimeoutException())
        }
      }

    // Set the timeout to check in the future
    timer.schedule(timerTask, timeout.toMillis)

    future.map {
      a =>
        if(p.trySuccess(a)) {
          timerTask.cancel()
        }
    }
    .recover {
      case e: Exception =>
        if(p.tryFailure(e)) {
          timerTask.cancel()
        }
    }

    p.future
  }

}
justinhj
  • 11,147
  • 11
  • 58
  • 104
  • I upvoted the answer. A minor thing is that `p` doesn't have to be mutable. A `val` would suffice. – fusion Feb 13 '18 at 12:48
  • Thank you, I've made the change here. – justinhj Feb 15 '18 at 20:04
  • Does this solution actually stop the Future in the background from running after the timeout? . – user2715182 Sep 15 '18 at 10:06
  • No, it does not cancel the Future. There is some code here that supports Future cancellation https://viktorklang.com/blog/Futures-in-Scala-protips-6.html You can also look at an effect library like Zio or Cats Effect – justinhj Nov 16 '20 at 02:57
23

I've just created a TimeoutFuture class for a coworker:

TimeoutFuture

package model

import scala.concurrent._
import scala.concurrent.duration._
import play.libs.Akka
import play.api.libs.concurrent.Execution.Implicits._

object TimeoutFuture {
  def apply[A](timeout: FiniteDuration)(block: => A): Future[A] = {

    val prom = promise[A]

    // timeout logic
    Akka.system.scheduler.scheduleOnce(timeout) {
      prom tryFailure new java.util.concurrent.TimeoutException
    }

    // business logic
    Future { 
      prom success block
    }

    prom.future
  } 
}

Usage

val future = TimeoutFuture(10 seconds) { 
  // do stuff here
}

future onComplete {
  case Success(stuff) => // use "stuff"
  case Failure(exception) => // catch exception (either TimeoutException or an exception inside the given block)
}

Notes:

  • Assumes Play! framework (but it's easy enough to adapt)
  • Every piece of code runs in the same ExecutionContext which may not be ideal.
Erik Kaplun
  • 37,128
  • 15
  • 99
  • 111
Pablo Fernandez
  • 103,170
  • 56
  • 192
  • 232
  • One issue I hit with this implementation was that if block threw an exception, and I did a Await.result(future, 5 seconds), a TimeoutException would be thrown rather than the underlying exception. I'm running this on scala 2.11, so I'm not sure why prom success block wouldn't complete the Future if block threw an exception. The way I worked around it was to do a try-catch on prom success block, and do prom failure e in the catch handler – anshumans Jul 15 '14 at 01:43
  • From looking at this more, I think the issue is that if block throws an exception, it's not being written to prom. – anshumans Jul 15 '14 at 17:14
  • @pablo As you mentioned it requires play framework, and if i include this directly in a scala application it throws "There is no started application" for which I will have a start an play application etc (which I don't want to do). Is any other scheduler available to run this, as I need functionality of future with timeout in scala code. I am quite new to scala. – Abhishek Anand Sep 16 '16 at 08:20
  • @AbhishekAnand It can be adapted but I guess you'll be better of trying some of the other, already working, answers :) – Pablo Fernandez Sep 25 '16 at 22:50
  • @PabloFernandez , thanks for the reply Pablo. I was able to make it work with java scheduler. However, I found even if timeout do occur, it only gurantees, you will get a response from future. Future itself will not be killed or stop. So i just decided to let my app wait for x min/sec and let future run it's course – Abhishek Anand Sep 26 '16 at 05:13
6

Play framework contains Promise.timeout so you can write code like following

private def get(): Future[Option[Boolean]] = {
  val timeoutFuture = Promise.timeout(None, Duration("1s"))
  val mayBeHaveData = Future{
    // do something
    Some(true)
  }

  // if timeout occurred then None will be result of method
  Future.firstCompletedOf(List(mayBeHaveData, timeoutFuture))
}
Kir
  • 101
  • 1
  • 6
6

I'm quite surprise this is not standard in Scala. My versions is short and has no dependencies

import scala.concurrent.Future

sealed class TimeoutException extends RuntimeException

object FutureTimeout {

  import scala.concurrent.ExecutionContext.Implicits.global

  implicit class FutureTimeoutLike[T](f: Future[T]) {
    def withTimeout(ms: Long): Future[T] = Future.firstCompletedOf(List(f, Future {
      Thread.sleep(ms)
      throw new TimeoutException
    }))

    lazy val withTimeout: Future[T] = withTimeout(2000) // default 2s timeout
  }

}

Usage example

import FutureTimeout._
Future { /* do smth */ } withTimeout
Raul
  • 1,899
  • 2
  • 20
  • 32
  • 1
    Bare in mind that his will NOT stop the underlying future from executing! This can only be achieved from within the future. – Raul Jun 03 '15 at 16:52
  • 1
    @ScalaWilliam any suggestions? Blocking inside a future is considered safe. – Raul Jun 03 '15 at 16:53
  • Pablo Fernandez's answer is the right way to do this. – ScalaWilliam Jun 04 '15 at 12:50
  • 4
    The solution you refer to uses Akka library, which in turn relies on Java's `Thread.sleep`, the most low level way to pause the execution of a thread on a JVM. Ref: https://github.com/akka/akka/blob/ebc39ef9ab4440b98fe38b4994f8641d48128ee8/akka-actor/src/main/scala/akka/actor/Scheduler.scala#L236 – Raul Jun 05 '15 at 10:29
  • 4
    @Raul The difference is Akka uses one scheduling thread for all scheduled timeouts, while this solution creates one sleeping thread per call. – Konstantin Pelepelin Apr 23 '17 at 08:20
  • This one will deadlock if `f` uses the same `ExecutionContext` ;-) – user123 Jan 28 '20 at 09:27
4

If you want the writer (promise holder) to be the one who controls the timeout logic, use akka.pattern.after, in the following way:

val timeout = akka.pattern.after(10 seconds, system.scheduler)(Future.failed(new TimeoutException(s"timed out during...")))
Future.firstCompletedOf(Seq(promiseRef.future, timeout))

This way, if your promise completion logic never takes place, your caller's future will still be completed at some point with a failure.

galbarm
  • 2,441
  • 3
  • 32
  • 52
3

You can specify the timeout when you wait on the future:

For scala.concurrent.Future, the result method lets you specify a timeout.

For scala.actors.Future, Futures.awaitAll lets you specify a timeout.

I do not think there is a timeout built-in the execution of a Future.

gzm0
  • 14,752
  • 1
  • 36
  • 64
3

Nobody's mentioned akka-streams, yet. The flows have an easy completionTimeout method, and applying that on a single-source stream works like a Future.

But, akka-streams also does cancellation so it can actually end the source from running, i.e. it signals the timeout to the source.

akauppi
  • 17,018
  • 15
  • 95
  • 120
1

Monix Task has timeout support

import monix.execution.Scheduler.Implicits.global
import monix.eval._
import scala.concurrent.duration._
import scala.concurrent.TimeoutException

val source = Task("Hello!").delayExecution(10.seconds)

// Triggers error if the source does not complete in 3 seconds after runOnComplete
val timedOut = source.timeout(3.seconds)

timedOut.runOnComplete(r => println(r))
//=> Failure(TimeoutException)
WeiChing 林煒清
  • 4,452
  • 3
  • 30
  • 65
1

This version works without using any external timer (just Await.result)

import scala.concurrent._
import scala.concurrent.duration.FiniteDuration

object TimeoutFuture {
    def apply[A](
        timeout: FiniteDuration
    )(block: => A)(implicit executor: ExecutionContext): Future[A] =
        try {
            Future { Await.result(Future { block }, timeout) }
        } catch {
            case _: TimeoutException => Future.failed(new TimeoutException(s"Timed out after ${timeout.toString}"))
        }
}
unveloper
  • 260
  • 1
  • 4
  • 12
  • I'm impressed. While other solutions are sophisticated, this one really goes to the point and is pretty simple. – Germán Bouzas Aug 01 '19 at 14:45
  • 1
    What do you mean it works without using timeout? You clearly are using timeout – smac89 Sep 23 '19 at 01:31
  • I'm sorry, I wasn't very clear. I meant without using any external library to handle timeout like akka, netty or any kind of timer. – unveloper Sep 24 '19 at 06:39
  • You should avoid Await.result because it blocks a thread. That's why this is simpler than the other solutions. https://www.reactivesystems.eu/2019/02/19/dont-use-awaitresult.html – justinhj Jan 17 '22 at 01:29
0

I'm using this version (based on Play example above) which uses Akka system dispatcher:

object TimeoutFuture {
  def apply[A](system: ActorSystem, timeout: FiniteDuration)(block: => A): Future[A] = {
    implicit val executionContext = system.dispatcher

    val prom = Promise[A]

    // timeout logic
    system.scheduler.scheduleOnce(timeout) {
      prom tryFailure new java.util.concurrent.TimeoutException
    }

    // business logic
    Future {
      try {
        prom success block
      } catch {
        case t: Throwable => prom tryFailure t
      }
    }

    prom.future
  }
}
PJ Fanning
  • 953
  • 5
  • 13
0

The simplest way to specify timeout on Future IMO is scala's built in mechanism using scala.concurrent.Await.ready This will throw a TimeoutException if the Future takes longer than the specified timeout. Otherwise, it will return the Future itself. Here is a simple contrived example

import scala.concurrent.ExecutionContext.Implicits._
import scala.concurrent.duration._
val f1: Future[Int] = Future {
  Thread.sleep(1100)
  5
}

val fDoesntTimeout: Future[Int] = Await.ready(f1, 2000 milliseconds)

val f: Future[Int] = Future {
  Thread.sleep(1100)
  5
}
val fTimesOut: Future[Int] = Await.ready(f, 100 milliseconds)
sparker
  • 1,245
  • 11
  • 17
0

You can wait for a future to finish by making use of Await.

import scala.concurrent.duration._
import scala.concurrent.{Await, Future}

val meaningOfLife: Int = Await.result(Future(42), 1.nano)
println (meaningOfLife)

The above prints 42

You may need an implicit ExecutionContext available in which case, just add:

import scala.concurrent.ExecutionContext.Implicits.global

Another way to do it is to use Coeval from monix. This method does not work in all situations, and you can read all about it here. The basic idea is that sometimes a future does not really take any time and is returning the result of a synchronous function call or value, so this future can be evaluated on the current thread. This is also useful for testing and mocking futures. Also you don't have to specify a timeout which is expected, but still nice to not have to worry about that.

You start by transforming the future into a Task and wrap that task in a Coeval then cross your fingers as you wait to see what you get. This is a very simple example to show how it works:

You need an implicit Scheduler to be able to use it:

import monix.execution.Scheduler.Implicits.global


Coeval(Task.fromFuture(Future (42)).runSyncStep).value() match {
   case Right(v) => println(v)
   case Left(task) => println("Task did not finish")
}

The above completes and prints 42 to the console.

Coeval(Task.fromFuture(Future {
   scala.concurrent.blocking {
      42
   }
}).runSyncStep).value() match {
   case Right(v) => println(v)
   case Left(task) => println("Task did not finish")
}

This example prints Task did not finish:

smac89
  • 39,374
  • 15
  • 132
  • 179
0
You can simply run the future to completion without giving any timeout interval by setting the timeout to infinite as below:

**import scala.concurrent.duration._  
Await.result(run(executionContext), Duration.Inf)**

run function can be as below :

def run(implicit ec: ExecutionContext) = {  
      val list = Seq(  
          Future { println("start 1"); Thread.sleep(1000); println("stop 1")},  
          Future { println("start 2"); Thread.sleep(2000); println("stop 2")},  
          Future { println("start 3"); Thread.sleep(3000); println("stop 3")},  
          Future { println("start 4"); Thread.sleep(4000); println("stop 4")},  
          Future { println("start 5"); Thread.sleep(5000); println("stop 5")}  
      )  
      Future.sequence(list)  
    }  
Nikunj Kakadiya
  • 2,689
  • 2
  • 20
  • 35