7

I'm starting to be comfortable with Java CompletableFuture composition, having worked with JavaScript promises. Basically the composition just scheduled the chained commands on the indicated executor. But I'm unsure of which thread is running when the composition is performed.

Let's say I have two executors, executor1 and executor2; for simplicity let's say they are separate thread pools. I schedule a CompletableFuture (to use a very loose description):

CompletableFuture<Foo> futureFoo = CompletableFuture.supplyAsync(this::getFoo, executor1);

Then when that is done I transform the Foo to Bar using the second executor:

CompletableFuture<Bar> futureBar .thenApplyAsync(this::fooToBar, executor2);

I understand that getFoo() will be called from a thread in the executor1 thread pool. I understand that fooToBar() will be called from a thread in the executor2 thread pool.

But what thread is used for the actual composition, i.e. after getFoo() finishes and futureFoo() is complete; but before the fooToBar() command gets scheduled on executor2? In other words, what thread actually runs the code to schedule the second command on the second executor?

Is the scheduling performed as part of the same thread in executor1 that called getFoo()? If so, would this completable future composition be equivalent to my simply scheduling fooToBar() manually myself in the first command in the executor1 task?

dreamcrash
  • 47,137
  • 25
  • 94
  • 117
Garret Wilson
  • 18,219
  • 30
  • 144
  • 272
  • it's the thread that does the calling, it depends on your code base. But no, it is not neither the one from `executor1` nor `executor2`. The entire point of `thenApplyAsync` is to have determinism in those actions, only. may be [more details here](https://stackoverflow.com/questions/65685715/will-a-chain-of-method-calls-completablefuture-api-execute-asynchronously-if-t) – Eugene Mar 06 '21 at 01:02
  • "it's the thread that does the calling …" What do you mean by that exactly? You mean that if in my `main()` method in a thread named `main-thread` I call the original `CompletableFuture.supplyAsync()`, then when that is finished the call to schedule the `fooToBar()` operation in `executor2` will be in `main-thread` as well? But how can that be, as `main-thread` has asynchronously gone on its merry way and is now factoring primes or whatever? – Garret Wilson Mar 06 '21 at 02:00
  • I might have miss-understood your question a bit, sorry. Is your question what thread will schedule the execution on the `executor2` as part of that `thenApplyAsync`? – Eugene Mar 06 '21 at 03:00
  • Yes, @Eugene, that is exactly my question. "In other words, what thread actually runs the code to schedule the second command on the second executor [as part of `thenApplyAsync()`]?" – Garret Wilson Mar 06 '21 at 14:29
  • 2
    I'm sure that it's simply the thread that has finished the first future, and I don't see why it should/could be any other. Feel free to read the sources of `CompletableFuture` and `CompletionStage`. I just did and it very much seems like the first future .. the code is just terrible to read and I can't pinpoint it, so I don't want to write an answer. – akuzminykh Mar 07 '21 at 18:54
  • "…the code is just terrible to read and I can't pinpoint it…" haha That's exactly why I asked this question. ;) – Garret Wilson Mar 09 '21 at 01:19
  • @akuzminykh and we were both wrong – Eugene Mar 11 '21 at 13:09
  • @Eugene ¯\\_(ツ)_/¯ – akuzminykh Mar 11 '21 at 20:59

2 Answers2

5

This is intentionally unspecified. In practice, it will be handled by the same code that also handles the chained operations when the variants without the Async suffix are invoked and exhibits similar behavior.

So when we use the following test code

CompletableFuture.supplyAsync(() -> {
    LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
    return "";
}, r -> new Thread(r, "A").start())
.thenAcceptAsync(s -> {}, r -> {
    System.out.println("scheduled by " + Thread.currentThread());
    new Thread(r, "B").start();
});

it will likely print

scheduled by Thread[A,5,main]

as the thread that completed the previous stage was used to schedule the depending action.

However when we use

CompletableFuture<String> first = CompletableFuture.supplyAsync(() -> "",
    r -> new Thread(r, "A").start());
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
first.thenAcceptAsync(s -> {}, r -> {
    System.out.println("scheduled by " + Thread.currentThread());
    new Thread(r, "B").start();
});

it will likely print

scheduled by Thread[main,5,main]

as by the time the main thread invokes thenAcceptAsync, the first future is already completed and the main thread will schedule the action itself.

But that is not the end of the story. When we use

CompletableFuture<String> first = CompletableFuture.supplyAsync(() -> {
    LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(5));
    return "";
}, r -> new Thread(r, "A").start());

Set<String> s = ConcurrentHashMap.newKeySet();
Runnable submitter = () -> {
    String n = Thread.currentThread().getName();
    do {
        for(int i = 0; i < 1000; i++)
            first.thenAcceptAsync(x -> s.add(n+" "+Thread.currentThread().getName()),
                Runnable::run);
    } while(!first.isDone());
};
Thread b = new Thread(submitter, "B");
Thread c = new Thread(submitter, "C");
b.start();
c.start();
b.join();
c.join();
System.out.println(s);

It may not only print the combinations B A and C A from the first scenario and B B and C C from the second. On my machine it reproducibly also prints the combinations B C and C B indicating that an action passed to thenAcceptAsync by one thread got submitted to the executor by the other thread calling thenAcceptAsync with a different action at the same time.

This is matching the scenarios for the thread evaluating the function passed to thenApply (without the Async) described in this answer. As said at the beginning, that was what I expected as both things are likely handled by the same code. But unlike the thread evaluating the function passed to thenApply, the thread invoking the execute method on the Executor is not even mentioned in the documentation. So in theory, another implementation could use an entirely different thread not calling a method on the future nor completing it.

Holger
  • 285,553
  • 42
  • 434
  • 765
0

At the end is a simple program that does like your code snippet and allows you to play with it.

The output confirms that the executor you supply is called to complete (unless you explicitly call complete early enough - which would happen in the calling thread of complete) when the condition it is waiting on is ready - the get() on a Future blocks until the Future is finished.

Supply an arg - there's an executor 1 and executor 2, supply no args there's just one executor. The output is either (same executor - things a run as separate tasks in the same executor sequentially) -

In thread Thread[main,5,main] - getFoo
In thread Thread[main,5,main] - getFooToBar
In thread Thread[pool-1-thread-1,5,main] - Supplying Foo
In thread Thread[pool-1-thread-1,5,main] - fooToBar
In thread Thread[main,5,main] - Completed

OR (two executors - things again run sequentially but using different executors) -

In thread Thread[main,5,main] - getFoo
In thread Thread[main,5,main] - getFooToBar
In thread Thread[pool-1-thread-1,5,main] - Supplying Foo
In thread Thread[pool-2-thread-1,5,main] - fooToBar
In thread Thread[main,5,main] - Completed

Remember: the code with the executors (in this example can start immediately in another thread .. the getFoo was called prior to even getting to setting up the FooToBar).

Code follows -

package your.test;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.function.Supplier;

public class TestCompletableFuture {
    private static void dumpWhichThread(final String msg) {
        System.err.println("In thread " + Thread.currentThread().toString() + " - " + msg);
    }

    private static final class Foo {
        final int i;
        Foo(int i) {
            this.i = i;
        }
    };
    public static Supplier<Foo> getFoo() {
        dumpWhichThread("getFoo");
        return new Supplier<Foo>() {
            @Override
            public Foo get() {
                dumpWhichThread("Supplying Foo");
                return new Foo(10);
            }

        };
    }

    private static final class Bar {
        final String j;
        public Bar(final String j) {
            this.j = j;
        }
    };
    public static Function<Foo, Bar> getFooToBar() {
        dumpWhichThread("getFooToBar");
        return new Function<Foo, Bar>() {
            @Override
            public Bar apply(Foo t) {
                dumpWhichThread("fooToBar");
                return new Bar("" + t.i);
            }
        };
    }


    public static void main(final String args[]) throws InterruptedException, ExecutionException, TimeoutException {
        final TestCompletableFuture obj = new TestCompletableFuture();
        obj.running(args.length == 0);
    }

    private String running(final boolean sameExecutor) throws InterruptedException, ExecutionException, TimeoutException {
        final Executor executor1 = Executors.newSingleThreadExecutor(); 
        final Executor executor2 = sameExecutor ? executor1 : Executors.newSingleThreadExecutor(); 
        CompletableFuture<Foo> futureFoo = CompletableFuture.supplyAsync(getFoo(), executor1);
        CompletableFuture<Bar> futureBar = futureFoo.thenApplyAsync(getFooToBar(), executor2);
        try {
            // Try putting a complete here before the get ..
            return futureBar.get(50, TimeUnit.SECONDS).j;
        }
        finally {
            dumpWhichThread("Completed");
        }
    }
}

Which thread triggers the Bar stage to progress - in the above - it's executor1. In general the thread completing the future (i.e. giving it a value) is what releases the thing depending on it. If you completed the FutureFoo immediately on the main thread - it would be the one triggering it.

SO you have to be careful with this. If you have "N" things all waiting on the future results - but use only a single threaded executor - then the first one scheduled will block that executor until it completes. You can extrapolate to M threads, N futures - it can decay into "M" locks preventing the rest of things progressing.

Mr R
  • 754
  • 7
  • 19
  • your examples show which thread _executes_ those features, not which _schedules_ them. – Eugene Mar 08 '21 at 01:33
  • It shows both - the getFoo and getFooToBar - are called at the time the scheduling is done in the main thread (i..e in the running() method - calls to supplyAsync and thenApplyAsync - the method supplied is called at the time of scheduling) ... exactly when they will be executed is a function of the specific Executor used - but definitely that won't happen in the main thread.. – Mr R Mar 08 '21 at 01:57
  • 1
    It's not about which thread calls `supplyAsync` and `thenApplyAsync`. It's about which thread triggers the second, dependent future when the first one is completed. You're missing the point of this question. – akuzminykh Mar 08 '21 at 13:13
  • There is no other "magic" thread ... executor1 releases the chained execution, as executor2 does if the bar then chained onto something else. – Mr R Mar 08 '21 at 20:08
  • Something interesting (I hadn't used CompletableFuture before) I noticed was that if you complete, rather than letting the async complete, you do get what you expect delivered to the futureBar, however, the executor still runs the Supply function .. it wasn't quite what I expected although it is documented `https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html` that Completeable can't cancel the execution - so makes sense. – Mr R Mar 08 '21 at 20:38