35

I am trying to implement scheduled future in Scala. I would like it to wait specific time and then execute the body. So far I tried the following, simple approach

val d = 5.seconds.fromNow

val f = future {Await.ready(Promise().future, d.timeLeft); 1}

val res = Await.result(f, Duration.Inf)

but I am getting the TimeoutExcpetion on the future. Is this even the correct approach or should I simply use the ScheduledExecutor from Java?

Bober02
  • 15,034
  • 31
  • 92
  • 178

7 Answers7

65

Akka has akka.pattern:

def after[T](duration: FiniteDuration, using: Scheduler)(value: ⇒ Future[T])(implicit ec: ExecutionContext): Future[T]

"Returns a scala.concurrent.Future that will be completed with the success or failure of the provided value after the specified duration."

http://doc.akka.io/api/akka/2.2.1/#akka.pattern.package

Viktor Klang
  • 26,479
  • 7
  • 51
  • 68
25

There is nothing to do that out of the box using the standard library alone. For most simple use cases, you can use a little helper such as this:

object DelayedFuture {
  import java.util.{Timer, TimerTask}
  import java.util.Date
  import scala.concurrent._
  import scala.concurrent.duration.FiniteDuration
  import scala.util.Try

  private val timer = new Timer(true)

  private def makeTask[T]( body: => T )( schedule: TimerTask => Unit )(implicit ctx: ExecutionContext): Future[T] = {
    val prom = Promise[T]()
    schedule(
      new TimerTask{
        def run() {
          // IMPORTANT: The timer task just starts the execution on the passed
          // ExecutionContext and is thus almost instantaneous (making it 
          // practical to use a single  Timer - hence a single background thread).
          ctx.execute( 
            new Runnable {
              def run() {
                prom.complete(Try(body))
              }
            }
          )
        }
      }
    )
    prom.future
  }
  def apply[T]( delay: Long )( body: => T )(implicit ctx: ExecutionContext): Future[T] = {
    makeTask( body )( timer.schedule( _, delay ) )
  }
  def apply[T]( date: Date )( body: => T )(implicit ctx: ExecutionContext): Future[T] = {
    makeTask( body )( timer.schedule( _, date ) )
  }
  def apply[T]( delay: FiniteDuration )( body: => T )(implicit ctx: ExecutionContext): Future[T] = {
    makeTask( body )( timer.schedule( _, delay.toMillis ) )
  }
}

This can be used like this:

import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits._

DelayedFuture( 5 seconds )( println("Hello") )

Note that unlike java scheduled futures, this implementation will not let you cancel the future.

Régis Jean-Gilles
  • 32,541
  • 5
  • 83
  • 97
  • 1
    I do not understand one thing: there is an `implicit ctx: ExecutionContext` argument to apply, but I cannot see where it would be used - I do not see `makeTask` or `timer.schedule` expecting it. – Suma Apr 01 '15 at 12:16
  • Thanks for pointing it out, that's clearly an error. My guess is that it is just a leftover from a refactoring I did (maybe at some point I had a `map`/`flatMap` somewhere for some reason), but it's been too long since I wrote this so I can't tell. I updated the snippet to remove the useless ExecutionContext. – Régis Jean-Gilles Apr 01 '15 at 13:25
  • 1
    I had a more serious loook at it, and actually I definitely need to pass the ExecutionContext. The culprit was elsewhere: I should have run the body on the ExecutionContext instead of running it on the Timer's background thread (meaning that app-wide you effectively had a single thread for every delayed future, not good). Even if it was essentially just an example, it was sloppy on my part. The new version is more produciton-ready. – Régis Jean-Gilles Apr 02 '15 at 09:39
  • 1
    How about `def run(): Unit = prom.complete(Try(block()))` instead of manual exception wrapping? – Suma Jul 10 '15 at 08:54
  • Glaringly better indeed. I've updated the code, thank you. – Régis Jean-Gilles Jul 10 '15 at 09:12
  • 1
    Once [SAM](http://stackoverflow.com/questions/17913409/what-is-a-sam-type-in-java) is supported in Scala ([2.12?](http://www.scala-lang.org/news/2.12-roadmap)) your solution can hopefully become even clearer and shorter. – Suma Jul 10 '15 at 09:50
  • 1
    `ctx.execute{() => prom.complete(Try(body))}` would sure look good. – Régis Jean-Gilles Jul 10 '15 at 10:09
  • 2
    i notice that the `private val timer = new Timer` uses the default of not running as a deamon thread so could possibly stop a shutdown of a processing using this code to schedule occasional tasks? perhaps it would best to use `private val timer = new Timer(true)` to prevent processes hanging? – simbo1905 Nov 22 '16 at 21:57
  • 1
    I think you're right, considering that the default execution contexts themselves use daemon threads. Thanks for the suggestion, I've updated my answer. – Régis Jean-Gilles Nov 23 '16 at 10:44
  • Nice, thanks. One minor thing is that to run your example in scala 2.12 I also needed `import scala.language.postfixOps`. – thund Apr 21 '18 at 21:03
19

If you want schedule the completion without Akka, you can use a regular Java Timer to schedule a promise to complete:

def delay[T](delay: Long)(block: => T): Future[T] = {
  val promise = Promise[T]()
  val t = new Timer()
  t.schedule(new TimerTask {
    override def run(): Unit = {
      promise.complete(Try(block))
    }
  }, delay)
  promise.future
}
Arne Claassen
  • 14,088
  • 5
  • 67
  • 106
  • Are you sure about the syntax? I think `block` is not not a function, it should not be used as `block()`, but rather as `block`. – Suma Jul 10 '15 at 08:42
  • 1
    Also, similar to my comment to http://stackoverflow.com/a/16363444/16673 - are you using the `executor` somewhere? – Suma Jul 10 '15 at 08:44
  • 1
    @Suma that's what i get writing code from memory without testing. I've fixed the code and made sure it does what I claimed it to do – Arne Claassen Jul 10 '15 at 15:47
  • 1
    Creating a new `Timer` instance every time is probably not a very good idea – Régis Jean-Gilles Nov 23 '16 at 10:40
7

My solution is pretty similar to Régis's but I use Akka to schedule:

 def delayedFuture[T](delay: FiniteDuration)(block: => T)(implicit executor : ExecutionContext): Future[T] = {
    val promise = Promise[T]

    Akka.system.scheduler.scheduleOnce(delay) {
      try {
        val result = block
        promise.complete(Success(result))
      } catch {
        case t: Throwable => promise.failure(t)
      }
    }
    promise.future
  }
agabor
  • 662
  • 6
  • 7
5

You could change your code to something like this:

val d = 5.seconds.fromNow
val f = Future {delay(d); 1}
val res = Await.result(f, Duration.Inf)

def delay(dur:Deadline) = {
  Try(Await.ready(Promise().future, dur.timeLeft))
}

But I would not recommend it. In doing so, you would be blocking in a Future (blocking to wait for that Promise that will never complete), and I think blocking in the ExecutionContext is greatly discouraged. I would either look into using the java scheduled executor as you stated or you could look into using Akka as @alex23 recommended.

cmbaxter
  • 35,283
  • 4
  • 86
  • 95
  • 5
    `Await.ready` uses `blocking`, so if you're doing work during the five seconds, at least the underlying pool can spin up a thread for it. – som-snytt Jun 23 '13 at 17:54
  • 2
    The imports needed for this (or the original question) to work would be nice to see. I think they are scala.concurrent.duration._ and scala.concurrent._ – akauppi Apr 14 '16 at 10:10
2

All the other solutions use either akka or block a thread per delayed task. A better solution (unless you are already using akka) is to use java's ScheduledThreadPoolExecutor. Here's an example of a scala wrapper for that:

https://gist.github.com/platy/8f0e634c64d9fb54559c

MikeB
  • 928
  • 9
  • 24
  • 2
    My answer show how to do it without akka nor blocking any thread, and was posted more than one year ago – Régis Jean-Gilles Feb 17 '15 at 08:13
  • Thanks I must have missed your answer, I like it, it's pretty rare that a thread pool for execution would be needed such as in my solution. – MikeB Feb 19 '15 at 18:51
-6

Shortest solution for this, is probably making use of scala-async:

import scala.async.Async.{async, await}

def delay[T](value: T, t: duration): Future[T] = async {
  Thread.sleep(t.toMillis)
  value
}

Or in case you want delayed execution of a block

def delay[T](t: duration)(block: => T): Future[T] async {
  Thread.sleep(t.toMillis)
  block()
}
pjan
  • 11
  • 2
    This is a very inadequate solution. As already mentioned, this will block a thread. – Régis Jean-Gilles Feb 17 '15 at 08:32
  • 1
    Two things: Aren't you missing a nested await { ... } in both cases? That would stop the thread from blocking and cause an actual delayed-future. Alternatively, why not just construct Future with Thread.sleep(...) call wrapped in a blocking { ... } block, to guard against potential deadlocks? – Darren Bishop Apr 28 '15 at 15:30