11

This is a followup to my previous question.

Suppose I have a task, which executes an interruptible blocking call. I would like to run it as a Future and cancel it with failure method of Promise.

I would like the cancel to work as follows:

  • If one cancels the task before it finished I would like the task to finish "immediately", interrupting the blocking call if it has already started and I would like the Future to invoke onFailure.

  • If one cancels the task after the task finished I would like to get a status saying that the cancel failed since the task already finished.

Does it make sense? Is it possible to implement in Scala? Are there any examples of such implementations?

Community
  • 1
  • 1
Michael
  • 41,026
  • 70
  • 193
  • 341

4 Answers4

13

scala.concurrent.Future is read-only, so one reader cannot mess things up for the other readers.

It seems like you should be able to implement what you want as follows:

def cancellableFuture[T](fun: Future[T] => T)(implicit ex: ExecutionContext): (Future[T], () => Boolean) = {
  val p = Promise[T]()
  val f = p.future
  p tryCompleteWith Future(fun(f))
  (f, () => p.tryFailure(new CancellationException))
}

val (f, cancel) = cancellableFuture( future => {
  while(!future.isCompleted) continueCalculation // isCompleted acts as our interrupted-flag

  result  // when we're done, return some result
})

val wasCancelled = cancel() // cancels the Future (sets its result to be a CancellationException conditionally)
Viktor Klang
  • 26,479
  • 7
  • 51
  • 68
  • Thanks. Suppose I execute some _interruptible_ blocking call instead of calculation. How can I modify the code above to interrupt the thread? – Michael Apr 16 '13 at 06:12
  • 2
    You'd have to add a synchronized var that sets the current thread under a lock when the computation starts, and then takes the lock at the end and clears the var. And the cancel would take the lock and call interrupt on the set Thread, if any, or bail out if null. – Viktor Klang Apr 16 '13 at 18:01
  • should it be `while(!future.isCompleted && moreWork) continueCalculation`? – sourcedelica Apr 17 '13 at 01:54
  • Cool - I figured but i was coding it up as an exercise and wanted to be sure :) The future that is passed into `fun` as a flag, is there anything special about it being a future other than it is read only and atomic? For example, could you use an AtomicBoolean instead (except it is read/write)? – sourcedelica Apr 17 '13 at 13:26
  • 1
    sourcedelica: An AtomicBoolean is mutable so that might not be the best choice, the reason for the Future is that it's already allocated and doesn't interfere in any way. – Viktor Klang Apr 17 '13 at 18:49
  • Hi @ViktorKlang, is it possible the future in the output on cancellable is not the interesting one? I mean did you mean `val f = Future(fun(p.future)) ; p tryCompleteWith f` – Francisco López-Sancho Apr 25 '20 at 00:22
  • @FranciscoLópez-Sancho I'm not sure I understand what you mean—could you clarify a bit? – Viktor Klang Apr 25 '20 at 21:21
  • Sorry @ViktorKlang, of course. I kind of feel that the future one wants back from the `cancellabeFuture` is not the future from the promise `val f = p.future` as this future is only used to trigger the cancellation. From inside `cancellableFuture`. on `while(!future.isCompleted) continueCalculation`. This future is essential to stop the calculation but the calculation itself is wrapped in other future `p tryCompleteWith Future(fun(f))`. And this `Future(fun(f))` is the one that we may want to get out of the method `cancellableFuture` maybe to add a callback to or something else? – Francisco López-Sancho Apr 26 '20 at 00:55
  • 1
    @FranciscoLópez-Sancho In the code, the Future returned will contain the result of the computation, or a CancellationException. – Viktor Klang Apr 26 '20 at 20:37
  • awesome! :) Thank you for letting me know @ViktorKlang – Francisco López-Sancho Apr 26 '20 at 22:08
  • @FranciscoLópez-Sancho you're most welcome—I'm happy that so many developers use Future & Promise. :) – Viktor Klang Apr 27 '20 at 10:48
11

Here is the interruptable version of Victor's code per his comments (Victor, please correct me if I misinterpreted).

object CancellableFuture extends App {

  def interruptableFuture[T](fun: () => T)(implicit ex: ExecutionContext): (Future[T], () => Boolean) = {
    val p = Promise[T]()
    val f = p.future
    val aref = new AtomicReference[Thread](null)
    p tryCompleteWith Future {
      val thread = Thread.currentThread
      aref.synchronized { aref.set(thread) }
      try fun() finally {
        val wasInterrupted = (aref.synchronized { aref getAndSet null }) ne thread
        //Deal with interrupted flag of this thread in desired
      }
    }

    (f, () => {
      aref.synchronized { Option(aref getAndSet null) foreach { _.interrupt() } }
      p.tryFailure(new CancellationException)
    })
  }

  val (f, cancel) = interruptableFuture[Int] { () =>
    val latch = new CountDownLatch(1)

    latch.await(5, TimeUnit.SECONDS)    // Blocks for 5 sec, is interruptable
    println("latch timed out")

    42  // Completed
  }

  f.onFailure { case ex => println(ex.getClass) }
  f.onSuccess { case i => println(i) }

  Thread.sleep(6000)   // Set to less than 5000 to cancel

  val wasCancelled = cancel()

  println("wasCancelled: " + wasCancelled)
}

With Thread.sleep(6000) the output is:

latch timed out
42
wasCancelled: false

With Thread.sleep(1000) the output is:

wasCancelled: true
class java.util.concurrent.CancellationException
sourcedelica
  • 23,940
  • 7
  • 66
  • 74
6

Twitter's futures implement cancellation. Have a look here:

https://github.com/twitter/util/blob/master/util-core/src/main/scala/com/twitter/util/Future.scala

Line 563 shows the abstract method responsible for this. Scala's futures currently do not support cancellation.

gzm0
  • 14,752
  • 1
  • 36
  • 64
  • Does it still? You linked to master, so the code has shifted since then. – Basil May 30 '20 at 19:15
  • Oh dear... It seems that `raise` might have replaced this functionality: https://twitter.github.io/util/docs/com/twitter/util/Future.html#raise(interrupt:Throwable):Unit – gzm0 Jun 02 '20 at 07:14
2

You can use Monix library instead of Future

https://monix.io

Amir Hossein Javan
  • 415
  • 1
  • 4
  • 12