36

Suppose I have some async computation, such as:

CompletableFuture
        .supplyAsync(() -> createFoo())
        .thenAccept(foo -> doStuffWithFoo(foo));

Is there a nice way to provide a default value for foo if the async supplier times out according to some specified timeout? Ideally, such functionality would attempt to cancel the slow-running supplier as well. For example, is there standard library functionality that is similar to the following hypothetical code:

CompletableFuture
        .supplyAsync(() -> createFoo())
        .acceptEither(
                CompletableFuture.completedAfter(50, TimeUnit.MILLISECONDS, DEFAULT_FOO),
                foo -> doStuffWithFoo(foo));

Or perhaps even better:

CompletableFuture
        .supplyAsync(() -> createFoo())
        .withDefault(DEFAULT_FOO, 50, TimeUnit.MILLISECONDS)
        .thenAccept(foo -> doStuffWithFoo(foo));

I know about get(timeout, unit), but am wondering if there's a nicer standard way of applying a timeout in an asynchronous and reactive fashion as suggested in the code above.

EDIT: Here's a solution that's inspired by Java 8: Mandatory checked exceptions handling in lambda expressions. Why mandatory, not optional?, but unfortunately it blocks a thread. If we rely on createFoo() to asynchronously check for timeout and throw its own timeout exception it would work without blocking a thread, but would place more burden on the creator of the supplier and would still have the cost of creating an exception (which can be expensive without "fast throw")

static <T> Supplier<T> wrapped(Callable<T> callable) {
    return () -> {
        try {
            return callable.call();
        } catch (RuntimeException e1) {
            throw e1;
        } catch (Throwable e2) {
            throw new RuntimeException(e2);
        }
    };
}
CompletableFuture
        .supplyAsync(wrapped(() -> CompletableFuture.supplyAsync(() -> createFoo()).get(50, TimeUnit.MILLISECONDS)))
        .exceptionally(e -> "default")
        .thenAcceptAsync(s -> doStuffWithFoo(foo));
Community
  • 1
  • 1
jonderry
  • 23,013
  • 32
  • 104
  • 171

5 Answers5

21

CompletableFuture.supplyAsync is just a helper method that creates a CompletableFuture for you, and submits the task to the ForkJoin Pool.

You can create your own supplyAsync with your requirements like this:

private static final ScheduledExecutorService schedulerExecutor = 
                                 Executors.newScheduledThreadPool(10);
private static final ExecutorService executorService = 
                                 Executors.newCachedThreadPool();


public static <T> CompletableFuture<T> supplyAsync(
        final Supplier<T> supplier, long timeoutValue, TimeUnit timeUnit,
        T defaultValue) {

    final CompletableFuture<T> cf = new CompletableFuture<T>();

    // as pointed out by Peti, the ForkJoinPool.commonPool() delivers a 
    // ForkJoinTask implementation of Future, that doesn't interrupt when cancelling
    // Using Executors.newCachedThreadPool instead in the example
    // submit task
    Future<?> future = executorService.submit(() -> {
        try {
            cf.complete(supplier.get());
        } catch (Throwable ex) {
            cf.completeExceptionally(ex);
        }
    });

    //schedule watcher
    schedulerExecutor.schedule(() -> {
        if (!cf.isDone()) {
            cf.complete(defaultValue);
            future.cancel(true);
        }

    }, timeoutValue, timeUnit);

    return cf;
}

Creating the CompletableFuture with that helper is as easy as using the static method in CompletableFuture:

    CompletableFuture<String> a = supplyAsync(() -> "hi", 1,
            TimeUnit.SECONDS, "default");

To test it:

    a = supplyAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e1) {
            // ignore
        }
        return "hi";
    }, 1, TimeUnit.SECONDS, "default");
Ruben
  • 3,986
  • 1
  • 21
  • 34
  • 2
    I wondered if the Thread.sleep(2000); is actually interrupted in your example. It **isn't**. If I change your example from `ForkJoinPool.commonPool().submit` to `Executors.newFixedThreadPool(1).submit`, then it is. I wonder why... – Peti Jun 10 '15 at 14:42
  • 2
    You are right @Peti! The commonPool in ForkJoinPool delivers an implementation of Future of type `ForkJoinTask` that doesn't interrupt the Future in case of cancel, while the Executors deliver a `FutureTask` that does. See Javadocs in the `cancel` method of `ForkJoinTask` : _mayInterruptIfRunning: this value has no effect in the default implementation because interrupts are not used to control cancellation_ – Ruben Jun 11 '15 at 18:05
  • 1
    This creates a new thread _every_ call. That's absolutely ridiculous from a performance standpoint. – tclamb Apr 06 '16 at 21:04
  • @Ruben Is there a reason to split work into the cachedPool and scheduled pool? That is, can you remove `executorService` and `future`, and just process `cf` in the callable submitted to `schedulerExecutor? – RavenMan Apr 16 '17 at 23:39
16

In Java 9, there will be completeOnTimeout(T value, long timeout, TimeUnit unit), which does what you want, although it does not cancel the slow supplier.

There is also a orTimeout(long timeout, TimeUnit unit), which completes exceptionally in case on a timeout.

MartyIX
  • 27,828
  • 29
  • 136
  • 207
user140547
  • 7,750
  • 3
  • 28
  • 80
7

DZone has a good article how to solve this: https://dzone.com/articles/asynchronous-timeouts

I'm not sure about the copyright of the code, hence I can't copy it here. The solution is very much like the one from Dane White but it uses a thread pool with a single thread plus schedule() to avoid wasting a thread just to wait for the timeout.

It also throws a TimeoutException instead of returning a default.

Aaron Digulla
  • 321,842
  • 108
  • 597
  • 820
  • 1
    got a question. In the last sample code given in dzone if `asyncCode()` fails to run withing 1s then exception path is taken. My question is, what happens if this `asyncCode()` is just taking too much time to complete (say, a min) and during that time whole bunch of requests (10K) came and invoked this piece of code? To put it simply, each time a request came `asyncCode()` was invoked but after 1s its abandoned by main thread even though the child thread is working on it. So for 10K requests wouldn't this exhaust memory available due to 10K hung child threads? – Cyriac George Mar 06 '20 at 00:54
  • @CyriacGeorge Probably not. First of all, there will be a thread pool somewhere which simply won't run thousands of threads but the configured max amount. The other thing is that the worker threads do complete - they aren't killed. So the thread pool will eventually run out of threads, requests will not be accepted/added to the pool. When the work that took too long is done, the threads will be returned to the pool. – Aaron Digulla Mar 27 '20 at 18:07
  • @CyriacGeorge Use this design carefully; it's not foolproof. If you have tasks that can get stuck forever, the pool will eventually be exhausted. Also, there should be some way to cancel the workers to tell them "we don't care anymore". – Aaron Digulla Mar 27 '20 at 18:09
  • `ScheduledExecutorService` with single thread - Will it suffice provided underlying API has heavy load? – masT Jul 13 '20 at 13:18
4

I think you'll always need an extra thread monitoring when its time to supply the default value. I'd probably go the route of having two supplyAsync calls, with the default wrapped in a utility API, linked by an acceptEither. If you'd rather wrap your Supplier, then you could use a utility API that makes the 'either' call for you:

public class TimeoutDefault {
    public static <T> CompletableFuture<T> with(T t, int ms) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(ms);
            } catch (InterruptedException e) { }
            return t;
        });
    }

    public static <T> Supplier<T> with(Supplier<T> supplier, T t, int ms) {
        return () -> CompletableFuture.supplyAsync(supplier)
            .applyToEither(TimeoutDefault.with(t, ms), i -> i).join();
    }
}

CompletableFuture<Void> future = CompletableFuture
        .supplyAsync(Example::createFoo)
        .acceptEither(
            TimeoutDefault.with("default", 1000),
            Example::doStuffWithFoo);

CompletableFuture<Void> future = CompletableFuture
        .supplyAsync(TimeoutDefault.with(Example::createFoo, "default", 1000))
        .thenAccept(Example::doStuffWithFoo);
Dane White
  • 3,443
  • 18
  • 16
  • 13
    Sleeping on threads is not nice way of doing async programming. You're effectively wasting a thread space for nothing. – bekce Oct 28 '15 at 08:46
4

There's no standard library method for constructing a CompletableFuture supplied with a value after a timeout. That said, it's really simple to roll your own with minimal resource overhead:

private static final ScheduledExecutorService EXECUTOR
        = Executors.newSingleThreadScheduledExecutor();

public static <T> CompletableFuture<T> delayedValue(final T value,
                                                    final Duration delay) {
    final CompletableFuture<T> result = new CompletableFuture<>();
    EXECUTOR.schedule(() -> result.complete(value),
                      delay.toMillis(), TimeUnit.MILLISECONDS);
    return result;
}

It can be used with the "either" methods of CompleteableFuture:

  • accceptEither, acceptEitherAsync
  • applyToEither, applyToEitherAsync
  • runAfterEither, runAfterEitherAsync

One application is using a cached value if a remote service call exceeds some latency threshold:

interface RemoteServiceClient {
    CompletableFuture<Foo> getFoo();
}

final RemoteServiceClient client = /* ... */;
final Foo cachedFoo = /* ... */;
final Duration timeout = /* ... */;

client.getFoos()
    .exceptionally(ignoredException -> cachedFoo)
    .acceptEither(delayedValue(cachedFoo, timeout),
        foo -> /* do something with foo */)
    .join();

In case the remote client call completes exceptionally (e.g. SocketTimeoutException), we can fail fast and use the cached value immediately.

CompletableFuture.anyOf(CompletableFuture<?>...) can be combined with this delayedValue primitive to wrap a CompletableFuture with the above semantics:

@SuppressWarnings("unchecked")
public static <T> CompletableFuture<T> withDefault(final CompletableFuture<T> cf,
                                                   final T defaultValue,
                                                   final Duration timeout) {
    return (CompletableFuture<T>) CompletableFuture.anyOf(
        cf.exceptionally(ignoredException -> defaultValue),
        delayedValue(defaultValue, timeout));
}

This nicely simplifies the remote service call example above:

withDefault(client.getFoos(), cachedFoo, timeout)
    .thenAccept(foo -> /* do something with foo */)
    .join();

CompletableFutures are more accurately termed promises, as they decouple creation of the Future from its completion. Be sure to use dedicated thread pools for heavy CPU work. To create a CompletableFuture for an expensive computation, you should use the CompletableFuture#supplyAsync(Supplier, Executor) overload, as the #supplyAsync(Supplier) overload defaults to the common ForkJoinPool. The returned CompletableFuture could not cancel its task, as this functionality isn't exposed by the Executor interface. More generally, dependent CompletableFutures don't cancel their parents, e.g. cf.thenApply(f).cancel(true) does not cancel cf. I'd recommend sticking to the Futures returned by ExecutorServices if you need that functionality.

tclamb
  • 1,469
  • 13
  • 17
  • Will `Executors.newSingleThreadScheduledExecutor()` suffice the heavy load, or we need to tune per expected load? – masT Jul 13 '20 at 13:16