58

Java Future has cancel method, which can interrupt the thread, which runs the Future task. For example, if I wrap an interruptible blocking call in a Java Future I can interrupt it later.

Scala Future provides no cancel method. Suppose I wrap an interruptible blocking call in a Scala Future. How can I interrupt it?

Michael
  • 41,026
  • 70
  • 193
  • 341

4 Answers4

34

This is not yet a part of the Futures API, but may be added as an extension in the future.

As a workaround, you could use the firstCompletedOf to wrap 2 futures - the future you want to cancel and a future that comes from a custom Promise. You could then cancel the thus created future by failing the promise:

def cancellable[T](f: Future[T])(customCode: => Unit): (() => Unit, Future[T]) = {
  val p = Promise[T]
  val first = Future firstCompletedOf Seq(p.future, f)
  val cancellation: () => Unit = {
    () =>
      first onFailure { case e => customCode}
      p failure new Exception
  }
  (cancellation, first)
}

Now you can call this on any future to obtain a "cancellable wrapper". Example use-case:

val f = callReturningAFuture()
val (cancel, f1) = cancellable(f) {
  cancelTheCallReturningAFuture()
}

// somewhere else in code
if (condition) cancel() else println(Await.result(f1))

EDIT:

For a detailed discussion on cancellation, see Chapter 4 in the Learning concurrent programming in Scala book.

axel22
  • 32,045
  • 9
  • 125
  • 137
  • 3
    Yes, Promises are the producing end of futures in scala, so that's where you control the outcome. Some links: http://www.scala-lang.org/api/current/index.html#scala.concurrent.Promise, http://docs.scala-lang.org/overviews/core/futures.html#promises – rompetroll Apr 15 '13 at 07:54
  • No, you have to add this bit of custom cancellation/interruption logic in the `customCode` above. – axel22 Apr 15 '13 at 08:04
  • 1
    Thanks. Why does not the standard Scala library include this `cancel` ? – Michael Apr 15 '13 at 08:27
  • 8
    Unfortunately this is not 100% reliable. Indeed, between the time you call `cancel` and the time when `customCode` actually stops the body of the future (say by example that `customCode` sets a boolean flag that is checked by the future's body to know whether to abort), anything might happen. In particular the future's body might start to execute. The end result: while the future returned by `cancellable` says it was cancelled, the futures's body actually executed. That's a real problem as soon as the future's body performs any side effect. This makes using your code actually subtly dangerous. – Régis Jean-Gilles Apr 15 '13 at 08:29
  • 1
    What you can do instead then is to add a callback to the cancellable future to interrupt the thread - the idea is to interrupt the thread only after the future is cancelled. I'll update the answer. – axel22 Apr 15 '13 at 08:35
  • 1
    This won't do it either. The problem is still the same: between the moment that you decided that you want to cancel the future (that is, in your example `condition` evaluated to `true`) and the moment you actually interrupt the thread, it might be too late and the original future's body might already have executed. – Régis Jean-Gilles Apr 15 '13 at 08:39
  • 1
    True - you cannot have the cancellation of the future be atomically done with thread interruption - the API and the implementation do not allow this. This is a workaround that can help in some situations, though. – axel22 Apr 15 '13 at 08:40
  • By the way, if the call is interruptible, doesn't the library you're using to make that call already return some kind of future which you can cancel? (Specifically, I'm thinking of the few things I've heard about Scala-IO) – F.X. Apr 15 '13 at 08:47
  • 9
    Providing `Future.cancel` is not a good idea due to the reasons discussed; another problem is that a Future is a shared read-only handle and as such it should not provide methods which interfere with other readers. What you can do is to pass a Future into the code which you want to run and have that code check the Future periodically, then you have a principled way of interrupting the computation by completing the corresponding Promise. – Roland Kuhn Apr 15 '13 at 12:51
  • 5
    Future.cancel would be really helpful. I recently ran into a problem while I created futures connecting to different db for collecting stats. Some of the dbs had the problem that hung the connection, which made the future just run forever. Since the stat collection ran periodically, so guess what, my application ended up eating up all cpu's that are available to it. I realized the problem, and tried to find a way to timeout the future before database can be fixed, but so far no good answer can be found :( – Sheng Dec 26 '13 at 19:20
  • but it will not cancel original future – dmgcodevil Nov 05 '18 at 00:58
11

I haven't tested this, but this expands on the answer of Pablo Francisco Pérez Hidalgo. Instead of blocking waiting for the java Future, we use an intermediate Promise instead.

import java.util.concurrent.{Callable, FutureTask}
import scala.concurrent.{ExecutionContext, Promise}
import scala.util.Try

class Cancellable[T](executionContext: ExecutionContext, todo: => T) {
  private val promise = Promise[T]()

  def future = promise.future

  private val jf: FutureTask[T] = new FutureTask[T](
    new Callable[T] {
      override def call(): T = todo
    }
  ) {
    override def done() = promise.complete(Try(get()))
  }

  def cancel(): Unit = jf.cancel(true)

  executionContext.execute(jf)
}

object Cancellable {
  def apply[T](todo: => T)(implicit executionContext: ExecutionContext): Cancellable[T] =
    new Cancellable[T](executionContext, todo)
}
nightingale
  • 917
  • 9
  • 7
  • Great optimisation! More Java style code but thread use reduction is worth of it. I love Stack Overflow synergies – Pablo Francisco Pérez Hidalgo Oct 11 '16 at 21:11
  • 1
    @PabloFranciscoPérezHidalgo Do you mind if I publish an adaptation of this as a library? You can find my adaptation [here](https://github.com/NthPortal/cancellable-task) – NthPortal Jan 26 '17 at 02:21
  • @NthPortal Consider it open source. So, feel free! It would be nice if the authorship of the idea was collected in the repo :D (You could include my twitter handle "pfcoperez" or stack overflow profile link - as well as nightingale's - ) – Pablo Francisco Pérez Hidalgo Jan 26 '17 at 08:19
  • @NthPortal By the way, the initial version is already in a public repo https://github.com/Stratio/common-utils/blob/master/src/main/scala/com/stratio/common/utils/concurrent/Cancellable.scal , you can commit the updated one there too ;-) – Pablo Francisco Pérez Hidalgo Jan 26 '17 at 08:23
8

By cancelling I guess you would like to violently interrupt the future.

Found this segment of code: https://gist.github.com/viktorklang/5409467

Did a few tests and seems to work fine!

Enjoy :)

George Pligoropoulos
  • 2,919
  • 3
  • 33
  • 65
  • Do you have your code segment using this function? I'm having some trouble understanding how to use it. – infomofo Jan 28 '14 at 16:04
  • just call the function. it returns two values, the future and the cancellor (a function which you can call to cancel the running future). There is nothing more to it really. You don't have to understand it in order to use it. Just copy paste it. Hope this helps a little bit – George Pligoropoulos Jan 28 '14 at 19:05
  • 1
    The answer here might help a little with understanding how to use this: http://stackoverflow.com/a/16050595/237399 – Eric G Aug 05 '14 at 00:16
  • Futures are really useful when you combine them with flatMap, map, filter, etc. Cancelation approach above does not work when you combine futures. For example val f = Future { // blocking interruptable computation }.map(res => { // one more blocking interruptable computation }) How can I cancel future f? – mtomy Apr 23 '15 at 19:57
  • In general I agree that cancelling is possible as shown, but NOT RECOMMENDED – George Pligoropoulos Apr 24 '15 at 10:51
  • This is the only way that actually works! All other examples mentioned here don't work (the Future is not being terminated). Yes, interrupting threads is not a good idea, but it's the other question. See proof here: https://pastebin.com/5HTGkJGt – Mitrakov Artem Sep 19 '18 at 16:03
3

I think it is possible to reduce the complexity of the implementations provided by making use of the Java 7 Future interface and its implementations.

Cancellable can build a Java future which is the one to be cancelled by its cancel method. Another future can wait for its completion thus becoming the observable interface which is itself immutable in state:

 class Cancellable[T](executionContext: ExecutionContext, todo: => T) {

   private val jf: FutureTask[T] = new FutureTask[T](
     new Callable[T] {
       override def call(): T = todo
     }
   )

   executionContext.execute(jf)

   implicit val _: ExecutionContext = executionContext

   val future: Future[T] = Future {
     jf.get
   }

   def cancel(): Unit = jf.cancel(true)

 }

 object Cancellable {
   def apply[T](todo: => T)(implicit executionContext: ExecutionContext): Cancellable[T] =
     new Cancellable[T](executionContext, todo)
 }
  • 1
    I haven't tested this yet, but it looks great. But maybe `jf.get` call should be wrapped into `scala.concurrent.blocking` block for good measure. – nightingale Oct 11 '16 at 19:42
  • @nightingale Thanks! I use it in production https://github.com/Stratio/Common-utils/blob/master/src/main/scala/com/stratio/common/utils/concurrent/Cancellable.scala at https://github.com/Stratio/Crossdata/blob/6e05a8aede07be477b5f374091f875c401b3c79e/server/src/main/scala/com/stratio/crossdata/server/actors/JobActor.scala#L167. `jf.get` is being called within a block passed to a `Future` constructor, IMHO there is no danger from potential blocking indeed, is it? – Pablo Francisco Pérez Hidalgo Oct 11 '16 at 19:49
  • 1
    @nightingale I've checked this question http://stackoverflow.com/questions/19681389/use-case-of-scala-concurrent-blocking and it (`blocking`) seems it might apply. I'll check it out, thanks for your suggestion :) – Pablo Francisco Pérez Hidalgo Oct 11 '16 at 19:52
  • Yeah, as the link you found says too, I think it's exactly the case to use the `blocking` hint so that underlying thread pool temporarily increases the number of workers. You're welcome! – nightingale Oct 11 '16 at 20:00
  • @nightingale That's great! I wasn't concerned about the number of executors because, given the very specific characteristics of the code where I use it, I had too use a custom `ExecutionContext` which provides a whole new thread for each execution: `class ProlificExecutor extends Executor { override def execute(command: Runnable): Unit = new Thread(command) start }` – Pablo Francisco Pérez Hidalgo Oct 11 '16 at 20:11
  • Oh, yeah, that would obviate this particular concern, indeed. Well, usefulness of the blocking hint is limited as it is, being applicable only to the default global ExecutionContext, so... – nightingale Oct 11 '16 at 20:20
  • But I wonder if we can have the best of two words and avoid always stealing a thread from the pool just so that we could wait on the java Future even if nobody called get()/Await.result() on the outer scala Future. If instead we use a `Promise` and complete it from `FutureTask.done()`... – nightingale Oct 11 '16 at 20:42
  • 1
    Check out my answer for implementation of this idea. I've intentionally kept it so that it remains a drop-in replacement for your class, but now it should use half as much threads. Hope it works and will be of use! – nightingale Oct 11 '16 at 21:00