15

In java-9 the new method completeOnTimeout in the CompletableFuture class was introduced:

public CompletableFuture<T> completeOnTimeout(T value, long timeout,
                                              TimeUnit unit) {
    if (unit == null)
        throw new NullPointerException();
    if (result == null)
        whenComplete(new Canceller(Delayer.delay(
                                       new DelayedCompleter<T>(this, value),
                                       timeout, unit)));
    return this;
}

What I do not understand is why it uses the static ScheduledThreadPoolExecutor inside its implementation:

    static ScheduledFuture<?> delay(Runnable command, long delay,
                                    TimeUnit unit) {
        return delayer.schedule(command, delay, unit);
    }

Where

    static final ScheduledThreadPoolExecutor delayer;
    static {
        (delayer = new ScheduledThreadPoolExecutor(
            1, new DaemonThreadFactory())).
            setRemoveOnCancelPolicy(true);
    }

For me it is a very strange approach, as it can become a bottleneck for the whole application: the only one ScheduledThreadPoolExecutor with the only one thread keeping inside the pool for all possible CompletableFuture tasks?

What am I missing here?

P.S. It looks like:

  1. authors of this code were reluctant to extract this logic and preferred to reuse the ScheduledThreadPoolExecutor,

  2. and this apparently leaded to a such solution with a static variable, because it is very inefficient to create a new executor for each CompletableFuture.

But my doubt still remains, as I find general approach strange.

Andremoniy
  • 34,031
  • 20
  • 135
  • 241
  • the use of `setRemoveOnCancelPolicy(true);` prevents bottleneck, no? By default is false. You are expecting majority of futures to complete, then if not handled by the timeout of course and any cancellations to be dealt with by this call? – Cowboy Farnz Mar 27 '18 at 11:26
  • I am not sure how is `setRemoveOnCancelPolicy` related to the bottleneck problem. – Andremoniy Mar 27 '18 at 11:27
  • Think because by default, cancelled tasks are not automatically removed from the work queue until delay elapses. – Cowboy Farnz Mar 27 '18 at 11:29
  • But it is about canceled tasks, this is a specific case. I am talking about regular usage – Andremoniy Mar 27 '18 at 11:30
  • but wouldnt that be the point of using the static `ScheduledThreadPoolExecutor` inside it's implementation -to handle such specific cases as it extends `ThreadPoolExecutor`? – Cowboy Farnz Mar 27 '18 at 11:32
  • 1
    By *..the only one ScheduledThreadPoolExecutor with the only one thread keeping inside the pool..* are you saying that only one daemon thread created by the `DaemonThreadFactory` would exist? Maybe I am not getting the exact point put forward there, but could you please explain a little better? Esp. what do you mean by *...for all possible CompletableFuture tasks..* – Naman Mar 27 '18 at 11:38
  • @nullpointer I am talking about `corePoolSize` which equals `1` here – Andremoniy Mar 27 '18 at 11:40
  • I guess they measured this solution could already support a high load – especially if we assume that most tasks will not reach the timeout. I suppose this is mainly provided for convenience, as it is rather easy to implement a custom solution if this one does not satisfy your needs. I'm afraid this will be primarily opinion based though… – Didier L Mar 27 '18 at 11:41
  • @DidierL that's a good point. Also I am thinking that they were lazy or reluctant to implement the delaying logic without SchedulerExecutor, and this leaded to need of the static executor, because apparently it is very inefficient to create executor for each CompletableFuture instance. – Andremoniy Mar 27 '18 at 12:12
  • 3
    Using a `ScheduledThreadPoolExecutor` avoids the need to keep threads sleeping while waiting for the timeout, so I believe that is an appropriate solution. The problem that I hadn't noticed previously is that the `DelayedCompleter` will trigger the execution of dependent stages on the same thread. This means your dependent stages should always use the `*Async()` variants to avoid using that executor. They should have provided variants where you give your own executor instead… – Didier L Mar 27 '18 at 12:36

2 Answers2

7

You are right, this could become a bottleneck, but not for the completion itself, which is merely setting a variable in the CompletableFuture. That single thread could complete millions of futures in a second. The critical aspect is that the completion could trigger the evaluation of dependent stages within the completing thread.

So

Executor neverDone = r -> {};
long t0 = System.nanoTime();
CompletableFuture<String> c11 =
    CompletableFuture.supplyAsync(() -> "foo", neverDone)
        .completeOnTimeout("timeout", 2, TimeUnit.SECONDS)
        .thenApply(s -> {
            System.out.println("long dependent action 1 "+Thread.currentThread());
            LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5));
            return s;
        });
CompletableFuture<String> c12 =
    CompletableFuture.supplyAsync(() -> "bar", neverDone)
        .completeOnTimeout("timeout", 2, TimeUnit.SECONDS)
        .thenApply(s -> {
            System.out.println("long dependent action 2 "+Thread.currentThread());
            LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5));
            return s;
        });
System.out.println("set up");
CompletableFuture.allOf(
    c11.thenAccept(System.out::println),
    c12.thenAccept(System.out::println)
).join();
System.out.println(Math.round((System.nanoTime()-t0)*1e-9)+" s");

will print

set up
long dependent action 1 Thread[CompletableFutureDelayScheduler,5,main]
timeout
long dependent action 2 Thread[CompletableFutureDelayScheduler,5,main]
timeout
12 s

Using the …Async chaining methods will eliminate the issue

Executor neverDone = r -> {};
long t0 = System.nanoTime();
CompletableFuture<String> c11 =
    CompletableFuture.supplyAsync(() -> "foo", neverDone)
        .completeOnTimeout("timeout", 2, TimeUnit.SECONDS)
        .thenApplyAsync(s -> {
            System.out.println("long dependent action 1 "+Thread.currentThread());
            LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5));
            return s;
        });
CompletableFuture<String> c12 =
    CompletableFuture.supplyAsync(() -> "bar", neverDone)
        .completeOnTimeout("timeout", 2, TimeUnit.SECONDS)
        .thenApplyAsync(s -> {
            System.out.println("long dependent action 2 "+Thread.currentThread());
            LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5));
            return s;
        });
System.out.println("set up");
CompletableFuture.allOf(
    c11.thenAccept(System.out::println),
    c12.thenAccept(System.out::println)
).join();
System.out.println(Math.round((System.nanoTime()-t0)*1e-9)+" s");

will print

set up
long dependent action 2 Thread[ForkJoinPool.commonPool-worker-2,5,main]
long dependent action 1 Thread[ForkJoinPool.commonPool-worker-9,5,main]
timeout
timeout
7 s

The conclusion is that when you have a potentially lengthy evaluation, you should always chain is via one of the …Async methods. Given the absence of control over the executing thread when using the methods without the “…Async” suffix (it could also be the thread calling the chaining method or any other thread calling a “completion method”, see also this answer), this is what you always should do.

Holger
  • 285,553
  • 42
  • 434
  • 765
4

For sure, this is a question to be answered by the authors. Anyway, here's my opinion on the matter.

What I do not understand is why does it use the static ScheduledThreadPoolExecutor inside its implementation:

...

For me it is a very strange approach, as it can become a bottleneck for the whole application: the only one ScheduledThreadPoolExecutor with the only one thread keeping inside the pool for all possible CompletableFuture tasks?

You're right. The ScheduledThreadPoolExecutor can run arbitrary code. Specifically, orTimeout() and completeOnTimeout() will call completeExceptionally() and complete(), which, by default, call dependents synchronously.

To avoid this behavior, you must use your own CompletionStage or subclass of CompletableFuture which makes non-*Async methods always call *Async methods. This is much easier since Java 9 by overriding newIncompleteFuture().

It looks like:

1) authors of this code were reluctant to extract this logic and preferred to reuse the ScheduledThreadPoolExecutor,

When ForkJoinPool appeared in Java 7, it lacked a common thread pool. Java 8 introduced the static commonPool(), used by default (among others) in the introduced CompletableFuture and Stream classes.

It seems they were reluctant to expose a common scheduled executor. This would be just as useful as the common thread pool to avoid having many rarely used scheduled executors spread out.

If you need delayed tasks with static intervals, then CompletableFuture.delayedExecutor() is probably good enough, given a small overhead of wrapping objects.

For variable intervals, there's the extra overhead of creating a wrapper Executor each time, but there are already a few created objects along the way, such as new instances of the internal Canceller, Timeout, DelayedCompleter and TaskSubmitter classes.

How often do we need to delay many tasks in variable intervals? Pure asynchronous code may do it all the time for varying timeouts, but since we don't have the scheduled executor itself exposed, either we assume this overhead or we use yet another static scheduler.

2) and this apparently leaded to such a solution with the static variable, because it is very inefficient to create a new executor for each CompletableFuture.

Exactly.

acelent
  • 7,965
  • 21
  • 39