4

Here's a short code version of the problem I'm facing:

public static void main(String[] args) {
    CompletableFuture.supplyAsync(() -> {
                /*
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException ignored) {}
                */
                //System.out.println("supplyAsync: " + Thread.currentThread().getName());
                return 1;
            })
            .thenApply(i -> {
                System.out.println("apply: " + Thread.currentThread().getName());
                return i + 1;
            })
            .thenAccept((i) -> {
                System.out.println("accept: " + Thread.currentThread().getName());
                System.out.println("result: " + i);
            }).join();
}

This is the output that I get:

apply: main
accept: main
result: 2

I'm surprised to see main there! I expected something like this which happens when I uncomment the Thread.sleep() call or even as much as uncomment the single sysout statement there:

supplyAsync: ForkJoinPool.commonPool-worker-1
apply: ForkJoinPool.commonPool-worker-1
accept: ForkJoinPool.commonPool-worker-1
result: 2

I understand thenApplyAsync() will make sure it won't run on the main thread, but I want to avoid passing the data returned by the supplier from the thread that ran supplyAsync to the thread that's going to run thenApply and the other subsequent thens in the chain.

Srikanth
  • 11,780
  • 23
  • 72
  • 92
  • Interesting. Looks like JIT optimization is smart enough to know that returning a constant value takes essentially no time at all, so the overhead of a new thread isn’t worth it. – VGR Jan 05 '22 at 16:01
  • @VGR very unlikely to be related to JIT optimization. It’s just that by the time `thenApply()` is called, the first future is already completed. In that case, the next stage will be executed on the current thread – there is no other thread/executor it could use. If you don’t want that you need to use the `then*Async()` methods, as mentioned by OP at the end of the question. – Didier L Jan 05 '22 at 16:32
  • “_I want to avoid passing the data […]_”: what do you mean by that? Do you mean data stored in `ThreadLocal`? The `CompletableFuture` API does not make any guarantee on the thread that will be used, nor does it expose any context that would be passed between stages. In fact, it wouldn’t work really well for operations that combine multiple stages like `thenCombine()`, `thenCompose()` or `allOf()`. See also [Does CompletableFuture have a corresponding Local context?](https://stackoverflow.com/q/37933713/525036) – Didier L Jan 05 '22 at 16:41
  • @DidierL I don’t think `thenApply` places new tasks on a queue; it merely appends actions to an existing task. So I’m not sure why it would matter whether any “other thread/executor” is available. Even if that did matter, CompletableFuture methods use the common ForkJoinPool if no explicit Executor is given. – VGR Jan 05 '22 at 16:44
  • @VGR The common ForkJoinPool is only used as default for the `*Async()` methods when no executor is provided. It does not apply for any other method. If a future is already completed and you chain a non-async call on it, it will always be executed on the current thread. – Didier L Jan 05 '22 at 16:49
  • 1
    [related](https://stackoverflow.com/questions/65685715/will-a-chain-of-method-calls-completablefuture-api-execute-asynchronously-if-t/65691463#65691463) – Eugene Jan 05 '22 at 19:51

1 Answers1

7

The method thenApply evaluates the function in the caller’s thread because the future has been completed already. Of course, when you insert a sleep into the supplier, the future has not been completed by the time, thenApply is called. Even a print statement might slow down the supplier enough to have the main thread invoke thenApply and thenAccept first. But this is not reliable behavior, you may get different results when running the code repeatedly.

Not only does the future not remember which thread completed it, there is no way to tell an arbitrary thread to execute a particular code. The thread might be busy with something else, being entirely uncooperative, or even have terminated in the meanwhile.

Just consider

ExecutorService s = Executors.newSingleThreadExecutor();
CompletableFuture<Integer> cf = CompletableFuture.supplyAsync(() -> {
    System.out.println("supplyAsync: " + Thread.currentThread().getName());
    return 1;
}, s);
s.shutdown();
s.awaitTermination(1, TimeUnit.DAYS);
cf.thenApply(i -> {
    System.out.println("apply: " + Thread.currentThread().getName());
    return i + 1;
})
.thenAccept((i) -> {
    System.out.println("accept: " + Thread.currentThread().getName());
    System.out.println("result: " + i);
}).join();

How could we expect the functions passed to thenApply and thenAccept to be executed in the already terminated pool’s worker thread?

We could also write

CompletableFuture<Integer> cf = new CompletableFuture<>();

Thread t = new Thread(() -> {
    System.out.println("completing: " + Thread.currentThread().getName());
    cf.complete(1);
});
t.start();
t.join();

System.out.println("completer: " + t.getName() + " " + t.getState());
cf.thenApply(i -> {
    System.out.println("apply: " + Thread.currentThread().getName());
    return i + 1;
})
.thenAccept((i) -> {
    System.out.println("accept: " + Thread.currentThread().getName());
    System.out.println("result: " + i);
}).join();

which will print something alike

completing: Thread-0
completer: Thread-0 TERMINATED
apply: main
accept: main
result: 2

Obviously, we can’t insist on this thread processing the subsequent stages.

But even when the thread is a still alive worker thread of a pool, it doesn’t know that it has completed a future nor has it a notion of “processing subsequent stages”. Following the Executor abstraction, it just has received an arbitrary Runnable from the queue and after processing it, it proceeds with its main loop, fetching the next Runnable from the queue.

So once the first future has been completed, the only way to tell it to do the work of completing other futures, is by enqueuing the tasks. This is what happens when using thenApplyAsync specifying the same pool or performing all actions with the …Async methods without an executor, i.e. using the default pool.

When you use a single threaded executor for all …Async methods, you can be sure that all actions are executed by the same thread, but they will still get through the pool’s queue. Since even then, it’s the main thread actually enqueuing the dependent actions in case of an already completed future, a thread safe queue and hence, synchronization overhead, is unavoidable.

But note that even if you manage to create the chain of dependent actions first, before a single worker thread processes them all sequentially, this overhead is still there. Each future’s completion is done by storing the new state in a thread safe way, making the result potentially visible to all other threads, and atomically checking whether a concurrent completion (e.g. a cancelation) has happened in the meanwhile. Then, the dependent action(s) chained by other threads will be fetched, of course, in a thread safe way, before they are executed.

All these actions with synchronization semantics make it unlikely that there are benefits of processing the data by the same thread when having a chain of dependent CompletableFutures.

The only way to have an actual local processing potentially with performance benefits is by using

CompletableFuture.runAsync(() -> {
    System.out.println("supplyAsync: " + Thread.currentThread().getName());
    int i = 1;

    System.out.println("apply: " + Thread.currentThread().getName());
    i = i + 1;

    System.out.println("accept: " + Thread.currentThread().getName());
    System.out.println("result: " + i);
}).join();

Or, in other words, if you don’t want detached processing, don’t create detached processing stages in the first place.

Holger
  • 285,553
  • 42
  • 434
  • 765
  • 1
    Holger - fantastic answer! You really explained why it's impossible and it makes perfect sense. Thanks for your time! – Srikanth Jan 05 '22 at 17:09