1

Let's say I am having a future defined like shown below:

import scala.concurrent.Future

def appendCharWithTimeout(transformationId: String, char: Char, delayTimeMs: Long, delayTimes: Int) = (s: String) => {
  for (i <- 1 to delayTimes) {
    println(s"$transformationId waiting iteration $i ...")
    Thread.sleep(delayTimeMs)
  }
  s"$s$char"
}

Future("Hello ")
  .map( appendCharWithTimeout("mapJ", 'J', 200, 5) )
  .map( appendCharWithTimeout("mapO", 'o', 200, 5) )
  .map( appendCharWithTimeout("mapH", 'h', 200, 5) )
  .map( appendCharWithTimeout("mapN", 'n', 200, 5) )
  .map( appendCharWithTimeout("map!", '!', 200, 5) )

The execution time of this future is 5 seconds (5 * 5 * 200ms).

I am looking for a way to wrap this future in some sort of "timeout context" and stop the execution by timeout thus not all the transformations will be executed.

Ideally, I envision to have something like this:

Future("Hello ")
  .within(2 seconds)
  .map( appendCharWithTimeout("mapJ", 'J', 200, 5) )
  .map( appendCharWithTimeout("mapO", 'o', 200, 5) )
  .map( appendCharWithTimeout("mapH", 'h', 200, 5) )
  .map( appendCharWithTimeout("mapN", 'n', 200, 5) )
  .map( appendCharWithTimeout("map!", '!', 200, 5) )

And the output should be:

mapJ waiting iteration 1 ...
mapJ waiting iteration 2 ...
mapJ waiting iteration 3 ...
mapJ waiting iteration 4 ...
mapJ waiting iteration 5 ...
mapO waiting iteration 1 ...
mapO waiting iteration 2 ...
mapO waiting iteration 3 ...
mapO waiting iteration 4 ...
mapO waiting iteration 5 ...
Stephen L.
  • 509
  • 2
  • 14

2 Answers2

1

Here are a few of ways of doing this:

0) Don't chain the Futures. The execution is sequential so just use a loop inside a single Future and track the total elapsed time in your loop.

1) Record the start time in a val outside the Future and use this to modify the timeout value given to appendCharWithTimeout so that the total execution time is not exceeded.

2) Have appendCharWithTimeout take a total execution time and return the time remaining to the next iteration. Use this to stop execution when the timeout is exceeded.

The choice depends on what the real code actually does, and whether you can change the code in appendCharWithTimeout.

Tim
  • 26,753
  • 2
  • 16
  • 29
  • Even if the `final-future` is computed by sequential execution of `chained-futures`, the time taken is still counted as comuptation time for `final-future` and you can `timeout` the `final-future` in blocking way by using `Await` or in a non-blocking way using `Timer` and `Promise`. – sarveshseri Sep 26 '18 at 07:40
  • All the options mentioned rely on explicit time execution tracking. I am looking for an implicit way to do that. Found exactly what I needed in Twitter Futures - https://twitter.github.io/util/docs/com/twitter/util/Future.html#within(timer:com.twitter.util.Timer,timeout:com.twitter.util.Duration):com.twitter.util.Future[A] ... – Stephen L. Sep 26 '18 at 09:52
0

First of all, please do not mix Thread.sleep with futures. Futures work with a ExecutionContext which schedules the computations on a thread-pool. So, if your future are going to block the said threads... this will lead to problems.

import java.util.{Timer, TimerTask}

import scala.concurrent.{Future, Promise}
import scala.concurrent.duration.{Duration, TimeUnit}
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}


def createFutureWithDelay[T](result: T, timeout: Duration) = {
  val promise = Promise[T]
  val timerTask = new TimerTask {
    override def run(): Unit = promise.success(result)
  }
  val timer = new Timer()
  timer.schedule(timerTask, timeout.toMillis)
  promise.future
}

def getNonBlockingFutureWithTimeout[A, T](computation: => T, timeout: Duration, t: Throwable) = {
  val promise = Promise[T]
  promise.tryCompleteWith(Future(computation))
  val timerTask = new TimerTask {
    override def run(): Unit = promise.failure(t)
  }
  val timer = new Timer()
  timer.schedule(timerTask, timeout.toMillis)
  promise.future
}

def wrapFutureWithTimeout[T](f: Future[T], timeout: Duration, t: Throwable) = {
  val promise = Promise[T]
  promise.tryCompleteWith(f)
  val timerTask = new TimerTask {
    override def run(): Unit = promise.failure(t)
  }
  val timer = new Timer()
  timer.schedule(timerTask, timeout.toMillis)
  promise.future
}

val f = createFutureWithDelay(5, 5 minutes).flatMap(_ => createFutureWithDelay(5, 5 minutes))

val f2 = wrapFutureWithTimeout(f, 5 seconds, new Throwable("ENDING with timeout"))

f2.onComplete({
  case Success(value) => println(s"success - $value")
  case Failure(t) => println(s"failure - ${t.getMessage}")
})
sarveshseri
  • 13,738
  • 28
  • 47