32

I know that CompletableFuture design does not control its execution with interruptions, but I suppose some of you might have this problem. CompletableFutures are very good way to compose async execution, but given the case when you want the underlying execution to be interrupted or stopped when future is canceled, how do we do that? Or we must just accept that any canceled or manually completed CompletableFuture will not impact the thread working out there to complete it?

That is, in my opinion, obviously a useless work that takes time of executor worker. I wonder what approach or design might help in this case?

UPDATE

Here is a simple test for this

public class SimpleTest {

  @Test
  public void testCompletableFuture() throws Exception {
    CompletableFuture<Void> cf = CompletableFuture.runAsync(()->longOperation());

    bearSleep(1);

    //cf.cancel(true);
    cf.complete(null);

    System.out.println("it should die now already");
    bearSleep(7);
  }

  public static void longOperation(){
    System.out.println("started");
    bearSleep(5);
    System.out.println("completed");
  }

  private static void bearSleep(long seconds){
    try {
      TimeUnit.SECONDS.sleep(seconds);
    } catch (InterruptedException e) {
      System.out.println("OMG!!! Interrupt!!!");
    }
  }
}
Mansuro
  • 4,558
  • 4
  • 36
  • 76
vach
  • 10,571
  • 12
  • 68
  • 106
  • 2
    I wonder if we can implement something like its static method supplyAsync but with some additional logic that will check if its completed or cancled will interrupt the thread it was executing for this task... – vach Mar 12 '15 at 15:31
  • Please see my answer to related question: https://stackoverflow.com/questions/23301598/transform-java-future-into-a-completablefuture/37324409#37324409 In the code mentioned there, the CompletionStage behavior is added to RunnableFuture subclass (used by ExecutorService implementations), so you may interrupt it in the right way. – Valery Silaev May 19 '16 at 13:38

7 Answers7

21

A CompletableFuture is not related to the asynchronous action that may eventually complete it.

Since (unlike FutureTask) this class has no direct control over the computation that causes it to be completed, cancellation is treated as just another form of exceptional completion. Method cancel has the same effect as completeExceptionally(new CancellationException()).

There may not even be a separate thread working on completing it (there may even be many threads working on it). Even if there is, there's no link from a CompletableFuture to any thread that has a reference to it.

As such, there's nothing you can do through CompletableFuture to interrupt any thread that may be running some task that will complete it. You'll have to write your own logic which tracks any Thread instances which acquire a reference to the CompletableFuture with the intention to complete it.


Here's an example of the type of execution I think you could get away with.

public static void main(String[] args) throws Exception {
    ExecutorService service = Executors.newFixedThreadPool(1);
    CompletableFuture<String> completable = new CompletableFuture<>();
    Future<?> future = service.submit(new Runnable() {
        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                if (Thread.interrupted()) {
                    return; // remains uncompleted
                }
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    return; // remains uncompleted
                }
            }
            completable.complete("done");
        }
    });

    Thread.sleep(2000);

    // not atomic across the two
    boolean cancelled = future.cancel(true);
    if (cancelled)
        completable.cancel(true); // may not have been cancelled if execution has already completed
    if (completable.isCancelled()) {
        System.out.println("cancelled");
    } else if (completable.isCompletedExceptionally()) {
        System.out.println("exception");
    } else {
        System.out.println("success");
    }
    service.shutdown();
}

This assumes that the task being executed is setup to handle interruptions correctly.

MartyIX
  • 27,828
  • 29
  • 136
  • 207
Sotirios Delimanolis
  • 274,122
  • 60
  • 696
  • 724
  • well i knew that, but my question is about this logic to link them together... for example suppose i'll have my own static factory method used for this cases... instead of supplyAsync i'll use some custom factory method... dont you think its possible to link CF to thread that executes it...? – vach Mar 12 '15 at 15:49
  • 1
    @Vach You'd have to guarantee that the only `Thread`s that get access to the `CompletableFuture` are the ones that go through your custom factory method. That's not very feasible, imo. The important thing here is that the CF is not related to any one `Thread`. If you need to cancel some thread's work, you should have submitted the work through an `ExecutorService`, retrieved the `Future` and cancelled that. The `Runnable` or `Callable` submitted can have a reference to the CF. You won't be cancelling the CF, you'll be cancelling the task through the `Future`. – Sotirios Delimanolis Mar 12 '15 at 15:53
  • @Vach You can encapsulate the logic by joining the `Future` and `CompletableFuture` and cancelling both. Note that this won't be atomic. – Sotirios Delimanolis Mar 12 '15 at 15:53
  • Thing is that i need flexibility of chaining operations that completable futures provide, but i dont want them to let useless operations continue to waist my cpu cycles when i cancle them... thats pointless.. – vach Mar 12 '15 at 15:57
  • so i can have CompletableFuture from future? and when CF is cancled, call the same cancle of Future? Could you please give a clue how to do that? – vach Mar 12 '15 at 15:57
  • I'm even ready to extends CF and add dependency to F if thats only way :( – vach Mar 12 '15 at 15:59
  • I thought that RxJava will deal with this problem normally but seems the same problem is there too... – vach Mar 12 '15 at 15:59
  • @Vach I imagine you're thinking of a case where you use `runAsync` and want to cancel that task? However, that's not the only use case for `CompletableFuture`. You can launch 10 threads which all work towards completing it. Interrupting those threads is up to you. The threads themselves can't know they have a reference to a CF and the CF definitely can't know. Only the developer can. – Sotirios Delimanolis Mar 12 '15 at 16:02
  • @Vach Extending `Future` and `CompletableFuture` won't be very useful if you plan to use JDK libraries to submit the work (`ExecutorService`s and such). – Sotirios Delimanolis Mar 12 '15 at 16:03
  • I dont really see why using ExecutorService wont make it usefull... the specific problem is that code might run the same task 100 times in a second and i want those 99 to cancel if they didnt make it in time... anyway i think thats not reasonable to let some useless thread consume resources... – vach Mar 12 '15 at 16:08
  • @Vach I completely agree with you. Don't waste resources. But you won't be able to solve this with the existing CF. You can extend CF and wrap a `Future`, but I don't believe you'll have access to a lot of the components that handle completion (a lot of it is `private`) and I doubt you'll be able to perform the cancellation atomically. – Sotirios Delimanolis Mar 12 '15 at 16:13
  • 1
    @Vach I'll write an example of what I mean. Give me a few. – Sotirios Delimanolis Mar 12 '15 at 16:17
  • thats what i was afraid of, will have to digg this code and invent the wheel... i wonder how people didnt had this problem... Ok waiting for it :) – vach Mar 12 '15 at 16:17
  • 1
    @Vach Updated with an extremely simple example with only a single thread executing work that may set the `CompletableFuture`. – Sotirios Delimanolis Mar 12 '15 at 16:41
  • Thanks for the example, but you are cancling the future not the completable... my aim is to make completablefuture.cancel to interrupt underlying thread if its not finished... or did i misunderstood something? – vach Mar 13 '15 at 05:24
  • Eventutally i'll expose a factory method of my own that gives out CompletableFuture (but underlying implementation might be my own) that will interrupt the thread if there is one when it is cancled... this means i need to somehow make CF aware of its thread, or have some callback that will find that thread and call interrupt when CF is canceled... – vach Mar 13 '15 at 05:37
  • 2
    @Vach So your exact aim is unachievable since there are no underlying threads to a `CompletableFuture`. The association you're making between your threads and a `CompletableFuture` is yours alone. `CompletableFuture` was not intended that way. The closest you're going to get with a `Future` and a `CompletableFuture` is what I've presented above. Cancel the `Future`. If it was successful, cancel the associated `CompletableFuture`. Obviously, you might be able to get the behavior you want by writing your own `CompletableFuture`, but that is not an easy task. – Sotirios Delimanolis Mar 13 '15 at 16:00
  • 1
    @SotiriosDelimanolis Thanks for your explanation. In your example you use `Thread.sleep()` which already checks for interruption, so using `Thread.sleep()` might not be the better way of explaining your point (though the explanation is correct). For example, comment `if (Thread.interrupted()) {..}` and the program will get interrupted just the same. I think instead of `Thread.sleep()` the code should read `doIninterrumpibleComputation()`. – Daniel Apr 26 '17 at 22:07
3

What about this?

public static <T> CompletableFuture<T> supplyAsync(final Supplier<T> supplier) {

    final ExecutorService executorService = Executors.newFixedThreadPool(1);

    final CompletableFuture<T> cf = new CompletableFuture<T>() {
        @Override
        public boolean complete(T value) {
            if (isDone()) {
                return false;
            }
            executorService.shutdownNow();
            return super.complete(value);
        }

        @Override
        public boolean completeExceptionally(Throwable ex) {
            if (isDone()) {
                return false;
            }
            executorService.shutdownNow();
            return super.completeExceptionally(ex);
        }
    };

    // submit task
    executorService.submit(() -> {
        try {
            cf.complete(supplier.get());
        } catch (Throwable ex) {
            cf.completeExceptionally(ex);
        }
    });

    return cf;
}

Simple Test:

    CompletableFuture<String> cf = supplyAsync(() -> {
        try {
            Thread.sleep(1000L);
        } catch (Exception e) {
            System.out.println("got interrupted");
            return "got interrupted";
        }
        System.out.println("normal complete");
        return "normal complete";
    });

    cf.complete("manual complete");
    System.out.println(cf.get());

I don't like the idea of having to create an Executor service every time, but maybe you can find a way to reuse the ForkJoinPool.

Ruben
  • 3,986
  • 1
  • 21
  • 34
  • 1
    Glad you find it useful. If what you are trying to achieve though can be implemented via a timeout mechanism, maybe you find this solution more interesting: http://stackoverflow.com/questions/23575067/timeout-with-default-value-in-java-8-completablefuture/24457111#24457111 – Ruben Mar 14 '15 at 15:39
2

If you use

cf.get();

instead of

cf.join();

The thread waiting on the completion can be interrupted. This bit me in the a**, so I'm just putting it out there. You'd then need to propagate this interruption further / use cf.cancel(...) to really finish the execution.

jontejj
  • 2,800
  • 1
  • 25
  • 27
1

I had similar issue wherein I needed to simulate a InterruptedException.

I mocked the method call that is supposed to return the CompletetableFuture, and I put a spy on return value such that CompletableFuture#get will throw the exception.

It worked as I expected, and I was able to test that code handled the exception correctly.

        CompletableFuture spiedFuture = spy(CompletableFuture.completedFuture(null));
        when(spiedFuture .get()).thenThrow(new InterruptedException());

        when(servuce.getById(anyString())).thenReturn(spiedFuture );
Wand Maker
  • 18,476
  • 8
  • 53
  • 87
1

Here is a ultra-short version to create a Future task that can be cancelled:

public static <T> Future<T> supplyAsync(Function<Future<T>, T> operation) {
    CompletableFuture<T> future = new CompletableFuture<>();
    return future.completeAsync(() -> operation.apply(future));
}

The CompletableFuture is passed to the operation Function to be able to check the cancel status of the Future:

Future<Result> future = supplyAsync(task -> {
   while (!task.isCancelled()) {
       // computation
   }
   return result;
});
// later you may cancel
future.cancel(false);
// or retrieve the result
Result result = future.get(5, TimeUnit.SECONDS);

This however does not interrupt the Thread running the operation. If you also want to be able to interrupt the Thread, then you have to store a reference to it and override Future.cancel(..) to interrupt it.

public static <T> Future<T> supplyAsync(Function<Future<T>, T> action) {
    return supplyAsync(action, r -> new Thread(r).start());
}

public static <T> Future<T> supplyAsync(Function<Future<T>, T> action, Executor executor) {

    AtomicReference<Thread> interruptThread = new AtomicReference<>();
    CompletableFuture<T> future = new CompletableFuture<>() {

        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            if (!interruptThread.compareAndSet(null, Thread.currentThread()) 
                   && mayInterruptIfRunning) {
                interruptThread.get().interrupt();
            }
            return super.cancel(mayInterruptIfRunning);
        }
    };

    executor.execute(() -> {
        if (interruptThread.compareAndSet(null, Thread.currentThread())) try {
            future.complete(action.apply(future));
        } catch (Throwable e) {
            future.completeExceptionally(e);
        }
    });

    return future;
}

The following test checks that the Thread executing our Function got interrupted:

@Test
void supplyAsyncWithCancelOnInterrupt() throws Exception {
    Object lock = new Object();
    CountDownLatch done = new CountDownLatch(1);
    CountDownLatch started = new CountDownLatch(1);

    Future<Object> future = supplyAsync(m -> {
        started.countDown();
        synchronized (lock) {
            try {
                lock.wait(); // let's get interrupted
            } catch (InterruptedException e) {
                done.countDown();
            }
        }
        return null;
    });

    assertFalse(future.isCancelled());
    assertFalse(future.isDone());

    assertTrue(started.await(5, TimeUnit.SECONDS));
    assertTrue(future.cancel(true));

    assertTrue(future.isCancelled());
    assertTrue(future.isDone());
    assertThrows(CancellationException.class, () -> future.get());
    assertTrue(done.await(5, TimeUnit.SECONDS));
}
benez
  • 1,856
  • 22
  • 28
1

If you want to interrupt the underlying execution of a CompletableFuture, you can use the completeExceptionally method to complete the future with an exception. When you do this, any dependent CompletableFuture will be notified with the same exception, and any callback that you have set up with the whenComplete method will be called with the exception.

For example, let's say you have a CompletableFuture that represents a long-running computation:

CompletableFuture<String> computation = CompletableFuture.supplyAsync(() -> {
// long-running computation here
return "result";});

If you want to interrupt this computation when the CompletableFuture is canceled, you can add a callback to the CompletableFuture like this:

computation.whenComplete((result, exception) -> {
if (exception instanceof CancellationException) {
    // interrupt the computation here

}});

Inside the callback, you can check if the exception is a CancellationException, which is thrown when the CompletableFuture is canceled. If it is, you can interrupt the computation in whatever way is appropriate.

Alternatively, you can complete the CompletableFuture with an exception like this:

computation.completeExceptionally(new CancellationException());
Arturo Bernal
  • 31
  • 1
  • 1
0

What about?

/** @return {@link CompletableFuture} which when cancelled will interrupt the supplier
 */
public static <T> CompletableFuture<T> supplyAsyncInterruptibly(Supplier<T> supplier, Executor executor) {
    return produceInterruptibleCompletableFuture((s) -> CompletableFuture.supplyAsync(s, executor), supplier);
}

// in case we want to do the same for similar methods later
private static <T> CompletableFuture<T> produceInterruptibleCompletableFuture(
        Function<Supplier<T>,CompletableFuture<T>> completableFutureAsyncSupplier, Supplier<T> action) {
    FutureTask<T> task = new FutureTask<>(action::get);
    return addCancellationAction(completableFutureAsyncSupplier.apply(asSupplier(task)), () ->
            task.cancel(true));
}

/** Ensures the specified action is executed if the given {@link CompletableFuture} is cancelled.
 */
public static <T> CompletableFuture<T> addCancellationAction(CompletableFuture<T> completableFuture,
                                                             @NonNull Runnable onCancellationAction) {
    completableFuture.whenComplete((result, throwable) -> {
        if (completableFuture.isCancelled()) {
            onCancellationAction.run();
        }
    });
    return completableFuture;  // return original CompletableFuture
}

/** @return {@link Supplier} wrapper for the given {@link RunnableFuture} which calls {@link RunnableFuture#run()}
 *          followed by {@link RunnableFuture#get()}.
 */
public static <T> Supplier<T> asSupplier(RunnableFuture<T> futureTask) throws CompletionException {
    return () -> {
        try {
            futureTask.run();
            try {
                return futureTask.get();
            } catch (ExecutionException e) {  // unwrap ExecutionExceptions
                final Throwable cause = e.getCause();
                throw (cause != null) ? cause : e;
            }
        } catch (CompletionException e) {
            throw e;
        } catch (Throwable t) {
            throw new CompletionException(t);
        }
    };
}
user6519354
  • 169
  • 1
  • 7