1

As an example, I have a supplier that might take time to run:

Supplier<Integer> numLoader = sneaky(() -> {
    Thread.sleep(10000);
    System.out.println("5 Calculated!");
    return 5;
});

* sneaky is just a utility to convert to runtime exception.

I'd like to be able to do something like this:

Future<Integer> future = createFutureValueOnTimeout(-1, numLoader);
// numLoader takes 10 seconds to complete so -1 is returned.
int num = future.get(1000, TimeUnit.MILLISECONDS);
if (resourcesAreLow()) {
    future.cancel(true);
}
doSomethingWithTheValue(num);

I also have a partial implementation for createFutureValueOnTimeout:

private static <V> Future<V> createFutureValueOnTimeout(V v, Supplier<V> supplier) {
    CompletableFuture<V> completableFuture = CompletableFuture.supplyAsync(supplier);
    return new FutureDecorator<V>(completableFuture) {
        @Override
        public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException {
            return completableFuture.completeOnTimeout(v, timeout, unit).get();
        }
    };
}

The problem is that when calling cancel, the sleep isn't interrupted.

  1. How can I get the cancel to work?
  2. Is there an easier way to return a value on timeout?

Complete test:

public class TimeoutTest {
    @SneakyThrows
    @Test
    public void testTimeout() {
        int loadTimeMillis = 10000;
        Supplier<Integer> numLoader = () -> {
            try {
                // Simulate long operation
                Thread.sleep(loadTimeMillis);
            } catch (InterruptedException e) {
                System.out.println("Interrupted! message: " + e.getMessage());
                throw Lombok.sneakyThrow(e);
            }
            System.out.println("5 Calculated!");
            return 5;
        };

        Future<Integer> future = createFutureValueOnTimeout(-1, numLoader);
        
        long start = System.currentTimeMillis();

        // numLoader takes 10 seconds to complete so -1 is returned.
        int num = future.get(1000, TimeUnit.MILLISECONDS);

        System.out.println("Got: num: " + num + ". time: " + (System.currentTimeMillis() - start));

        if (resourcesAreLow()) {
            future.cancel(true);
        }
        // Don't stop the test. Give time for the cancel to kick in.
        Thread.sleep(loadTimeMillis);
        System.out.println("Finished. Time: " + (System.currentTimeMillis() - start));
    }

    private boolean resourcesAreLow() {
        return true;
    }

    private static <V> Future<V> createFutureValueOnTimeout(V v, Supplier<V> supplier) {
        CompletableFuture<V> completableFuture = CompletableFuture.supplyAsync(supplier);
        return new FutureDecorator<V>(completableFuture) {
            @Override
            public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException {
                return completableFuture.completeOnTimeout(v, timeout, unit).get();
            }
        };
    }

    private static class FutureDecorator<V> implements Future<V> {
        private final Future<V> inner;

        private FutureDecorator(Future<V> inner) {this.inner = inner;}

        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            return inner.cancel(mayInterruptIfRunning);
        }

        @Override
        public boolean isCancelled() {
            return inner.isCancelled();
        }

        @Override
        public boolean isDone() {
            return inner.isDone();
        }

        @Override
        public V get() throws InterruptedException, ExecutionException {
            return inner.get();
        }

        @Override
        public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
            return inner.get(timeout, unit);
        }
    }
}

Output: (Notice the lack of the Interrupted! message)

Got: num: -1. time: 1007
5 Calculated!
Finished. Time: 11021
AlikElzin-kilaka
  • 34,335
  • 35
  • 194
  • 277
  • The problem seems strangely simple... At least part of the answer is in the docs of `cancel`: ***mayInterruptIfRunning** this value has no effect in this implementation because interrupts are not used to control processing.* – ernest_k Jan 30 '22 at 19:06
  • 1
    And yet another part of the answer is that you can't count on `cancel` to stop the actual work done in the supplier; [seems you'll have to do it yourself](https://stackoverflow.com/questions/29013831/how-to-interrupt-underlying-execution-of-completablefuture). I think the solution will be complete if you combine `completeOnTimeout` and something like `whenComplete` on its resulting CompletableFutre, making that `whenComplete` stop the actual work (in cases where `mayInterruptIfRunning == true`, of course) – ernest_k Jan 30 '22 at 19:27
  • 1
    Why do you focus on adding an unsupported feature to `CompletableFuture`? Just use `Future future = ForkJoinPool.commonPool().submit(() -> { Thread.sleep(10000); System.out.println("5 Calculated!"); return 5; });` It uses the same pool as `CompletableFuture` uses (by default) but doesn’t require `sneaky` and it provides a future supporting cancelation with interruption. Good old Java 5 concurrency API… – Holger Jan 31 '22 at 13:33
  • @Holger, I'm trying to get some default value on timeout. CompletableFuture can help do that. Can't see how I can do it with a "regular" future. – AlikElzin-kilaka Feb 01 '22 at 11:00
  • 1
    I don’t see how “CompletableFuture can help do that”. In your examples, all relevant variables are literally declared as `Future`. You already have a wrapper implementing `Future`, so all you need to do, is to allow this wrapper to be completed with the default values when canceling the wrapped future, assuming that you wrap a future which does support cancelation. – Holger Feb 01 '22 at 12:10
  • @Holger, `CompletableFuture` helps by using it's api: `completableFuture.completeOnTimeout(v, timeout, unit)`. I don't need to handle the timeout myself. – AlikElzin-kilaka Feb 02 '22 at 07:04
  • 1
    You want to cancel the original job on timeout and `completeOnTimeout` doesn’t do that. So no, it does not help. Catching the `TimeoutException` thrown by `get` instead, requires a few lines of code, on the other hand, you’re saving the same number of lines by not needing try/catch in the Callable when using `submit`. It’s quite simple. When you want cancelation, you have to use a tool that supports cancelation. See also [What is the XY problem?](https://meta.stackexchange.com/questions/66377/what-is-the-xy-problem/66378#66378) – Holger Feb 02 '22 at 09:58
  • @Holger, I think there was a misunderstanding here. I was merely stating there are 2 objectives: 1. return default value on timeout. 2. On the same timeout, cancel the still background running task. CompletableFuture is very easy doing the first one, keeping it clean, without needing to catch stuff. I understand you saying it doesn't solve the cancelation part. – AlikElzin-kilaka Feb 02 '22 at 11:56

1 Answers1

2

You can combine the Executor/Future API which supports cancelation with CompletableFuture:

public static <R> CompletableFuture<R> withInterruptionSupport(Callable<R> c) {
    CompletableFuture<R> cf = new CompletableFuture<>();
    FutureTask<R> ft = new FutureTask<R>(c) {
        @Override
        protected void set(R v) {
            super.set(v);
            cf.complete(v);
        }
        @Override
        protected void setException(Throwable t) {
            super.setException(t);
            cf.completeExceptionally(t);
        }
    };
    cf.defaultExecutor().execute(ft);
    cf.whenComplete((x,y) -> ft.cancel(true));
    return cf;
}

Since support for interruption in the actual function typically implies dealing with InterruptedException, it’s convenient to use Callable instead of Supplier, so it is allowed to throw this exception.

The Future which supports cancelation with interruption is unconditionally canceled whenever the CompletableFuture completes, which works without problems as whenever the completion stems from the task itself, the future is already completed and the subsequent cancelation will be ignored.

This means, we do not need to distinguish between the different possibilities of completion here. Not only does completeOnTimeout work, you can also invoke cancel(…) on the CompletableFuture and it will interrupt the evaluation of the Callable (the boolean argument still is irrelevant though). Even calling complete with an alternative result without waiting for a timeout would interrupt the now-obsolete evaluation.

So, the following works:

for(int timeout: new int[] { 5, 15 }) {
    System.out.println("with timeout of " + timeout);
    Integer i = withInterruptionSupport(() -> {
            Thread.sleep(10000);
            System.out.println("5 Calculated!");
            return 5;
        })
        .completeOnTimeout(42, timeout, TimeUnit.SECONDS)
        .join();
    System.out.println("got " + i);
}
ForkJoinPool.commonPool().awaitQuiescence(1, TimeUnit.DAYS);
with timeout of 5
got 42
with timeout of 15
5 Calculated!
got 5
Holger
  • 285,553
  • 42
  • 434
  • 765
  • @holgar, Thanks. Upvoted and marked accepted. Nice trick with the `FutureTask`. I still needed the decorator to make `get` behave like `completeOnTimeout`. – AlikElzin-kilaka Feb 07 '22 at 18:25