3

I have overridden the execute method for java.util.concurrent.Executor in ThreadPoolExecutor implementation. The new implementation just decorates the runnable and then calls the original execute. The issue I'm having is that if I have two such executors, then following:

supplyAsync(() -> foo(), firstExecutor).thenApplyAsync(firstResult -> bar(), secondExecutor)

translates to two execute calls. Usually they are executed by main and firstExecutor, but sometimes it's main two times.

Does it depend on how long it takes to complete the Suppplier in supplyAsync?

Here's a minimal reproducible example (10k repeats, for me it fails about 3 times java.lang.AssertionError: Unexpected second decorator: main):

package com.foo;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.RepeatedTest;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

class DecorationTest {

    record WhoCalled(String decorator, String runnable) {}

    static class DecoratedExecutor extends ThreadPoolExecutor{

        private final List<WhoCalled> callers;

        public DecoratedExecutor(List<WhoCalled> callers, String threadName) {
            super(1, 1, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>(), runnable -> new Thread(runnable, threadName));
            this.callers = callers;
        }

        @Override
        public void execute(final Runnable command) {
            String decoratingThread = Thread.currentThread().getName();
            Runnable decorated = () -> {
                String runningThread = Thread.currentThread().getName();
                callers.add(new WhoCalled(decoratingThread, runningThread));
                command.run();
            };
            super.execute(decorated);
        }
    }

    List<WhoCalled> callers;
    ExecutorService firstExecutor;
    ExecutorService secondExecutor;

    @BeforeEach
    void beforeEach() {
        callers = new ArrayList<>();
        firstExecutor = new DecoratedExecutor(callers, "firstExecutor");
        secondExecutor = new DecoratedExecutor(callers, "secondExecutor");
    }

    @AfterEach
    void afterEach() {
        firstExecutor.shutdown();
        secondExecutor.shutdown();
    }


    @RepeatedTest(10_000)
    void testWhoCalled() throws Exception {
        Integer result = CompletableFuture.supplyAsync(() -> 1, firstExecutor)
                .thenApplyAsync(supplyResult -> supplyResult, secondExecutor)
                .get();

        assert result == 1;

        WhoCalled firstCallers = callers.get(0);
        assert firstCallers.decorator().equals("main");
        assert firstCallers.runnable().equals("firstExecutor");

        WhoCalled secondCallers = callers.get(1);
        assert secondCallers.decorator().equals("firstExecutor") : "Unexpected second decorator: " + secondCallers.decorator;
        assert secondCallers.runnable().equals("secondExecutor");
    }
}
nluk
  • 684
  • 1
  • 7
  • 14
  • The docs state: The task may execute in a new thread or in an existing pooled thread – Vivick Aug 17 '23 at 14:39
  • Ok, but the `main` thread is not: 1. a new thread 2. a pooled thread. The 'existing' part matches though. Also, the tasks (lambda bodies) run where they should, it's the `String decoratingThread = Thread.currentThread().getName();` part that runs on main. – nluk Aug 17 '23 at 14:41
  • Of course, that part just runs on the calling thread. Why wouldn't it? – Jorn Aug 17 '23 at 14:57
  • First time it should and does run on `main`, that's ok. But the second execution/decoration (of lambda in `thenApplyAsync`) it sometimes runs on `main` and sometimes on `firstExecutor`. That's why the next-to-last assertion does not fail 99% of test repeats. To be precise: `String decoratingThread = Thread.currentThread().getName();` runs always on `main` for `supplyAsync`, and randomly on `main` or `firstExecutor` for `thenApplyAsync`. The `supplyAsync` and `thenApplyAsync` lambdas always run where they should, so on `firstExecutor` and `secondExecutor`. – nluk Aug 17 '23 at 15:03
  • 3
    Javadoc for [`Executor#execute`](https://docs.oracle.com/en/java/javase/20/docs/api/java.base/java/util/concurrent/Executor.html#execute(java.lang.Runnable)) says (my emphasis): The command may execute in a new thread, in a pooled thread, *or in the calling thread*, at the discretion of the Executor implementation. – Basil Bourque Aug 17 '23 at 15:05
  • @BasilBourque That's a good catch. But I hoped that was the case just for certain implementations, for example the same docs show this implementation: `class DirectExecutor implements Executor { public void execute(Runnable r) { r.run(); } } ` Now this obviously always runs on the caller thread. So my question should be why does the `ThreadPoolExecutor` (the parent/delegate) sometimes run on main. And are there any ExecutorService implementations that specify where do they run under given circumstances. – nluk Aug 17 '23 at 15:12
  • I’m no expert on this, but I believe if you want to avoid that possible behavior of `Executor#execute` then you should instead call [`ExecutorService#submit`](https://docs.oracle.com/en/java/javase/20/docs/api/java.base/java/util/concurrent/ExecutorService.html#submit(java.lang.Runnable)) method (or others such as `invokeAll`). – Basil Bourque Aug 17 '23 at 16:36
  • Perhaps [some other sources](https://www.ecosia.org/search?q=java%20executor%20execute%20versus%20submit&tts=st_asaf_iphone) can shed some light on the matter of `execute` versus `submit` – Basil Bourque Aug 17 '23 at 16:38
  • Unfortunately I don't explicitly call `execute`. It's something inside the `CompletableFuture` logic that uses the `execute` part of `ExecutorService` interface. And ithe behaviour varies according to some condition, which is unknown at the moment. – nluk Aug 17 '23 at 17:53

1 Answers1

2

Does it depend on how long it takes to complete the Supplier in supplyAsync?

This depends on whether or not the supplyAsync part has been completed before the thenApplyAsync call.

Explanation

Let's split the CompletableFuture chain in the testWhoCalled test for explanation. Instead of:

Integer result = CompletableFuture.supplyAsync(() -> 1, firstExecutor)
         .thenApplyAsync(supplyResult -> supplyResult, secondExecutor)
         .get();

do:

CompletableFuture<Integer> firstFuture = CompletableFuture.supplyAsync(() -> 1, firstExecutor);
Integer result = firstFuture
         .thenApplyAsync(supplyResult -> supplyResult, secondExecutor)
         .get();

The test still fails for me sometimes. There are no changes in logic, but it will allow me easier to explain.

When I create the firstFuture using the supplyAsync method, the firstExecutor can already execute the lamba's (Supplier's) body.

In other words, computation can be already running. This is different from other frameworks. For example, in Project Reactor nothing happens until you subscribe.

So the computation can be already running after we return from the supplyAsync. Let's understand what happens next. There are two possibilities:

  1. firstExecutor has executed the () -> 1 lambda. And the CompletableFuture has the result
  2. The () -> 1 lambda is executing or will be executed later. We don't have result.

Now we do:

firstFuture.thenApplyAsync(...)

Who will submit the second lambda into the secondExecutor?

It will do either main thread or a thread from the firstExecutor:

  • If we have the result, the main thread will submit the task into the secondExecutor. Because no one can do it here except the main thread: the task is already done, the firstExecutor can not submit the task to the secondExecutor
  • If we don't have the result a thread from the firstExecutor submits the task into the secondExecutor

Experiment: add sleep in between

If I add a sleep in the test like:

CompletableFuture<Integer> firstFuture = CompletableFuture.supplyAsync(() -> 1, firstExecutor);
Thread.sleep(1);
Integer result = firstFuture
        .thenApplyAsync(supplyResult -> supplyResult, secondExecutor)
        .get();

I have 9986 failures. The sleep call increases the probability that the firstExecutor has completed the task before we add a new callback in thenApplyAsync.

See also "Asynchronous API with CompletableFuture: Performance Tips and Tricks" talk

Denis Zavedeev
  • 7,627
  • 4
  • 32
  • 53
  • This is a great answer! But could you expand on this part: `It will do either main thread or a thread from the firstExecutor:` - why is the firstExecutor sometimes capable to submit the second lambda? You told me when, but why can't it always do that? From what I see in the implementation of `uniApplyStage` if we have the result, then the task will execute in executor, but will be submited by caller (main). But if there's no result we do an `unipush` and the submit will be performed by executor. – nluk Aug 18 '23 at 08:02
  • Or even if we call unipush, but get a result - we call `c.tryFire(SYNC)` inside. – nluk Aug 18 '23 at 08:11
  • "why is the firstExecutor sometimes capable to submit the second lambda? You told me when, but why can't it always do that?" I guess, this is for performance reasons. If we have submitted the task to the `firstExecutor` and it has been completed, then it's more performant to submit the task by the caller, rather than to create a new task for the `firstExecutor` to submit a new task to the `secondExecutor` – Denis Zavedeev Aug 18 '23 at 14:44
  • 2
    @nluk that’s a simple matter of logic. Any thread can call `complete` on a `CompletableFuture` but the future has no possibility to force dependent actions to run on an arbitrary thread (the thread that completed the future). You are assuming that the completing thread will always belong to a pool and that the future remembers the pool and submits dependent actions to that pool but a) this doesn’t have to be the case and b) even the pool could have been shut down in the meantime. So the former completing thread is irrelevant when chaining a dependent action on an already completed future. – Holger Aug 21 '23 at 07:50
  • I understand that, but for me that seems like a poor abstraction on `CompletableFuture` side. And a pretty big oversight in dozens of MDC-related solutions here, like this one: https://stackoverflow.com/questions/6073019/how-to-use-mdc-with-thread-pools – nluk Aug 21 '23 at 13:57
  • 1
    @nluk if you want to decorate tasks, specify a dedicated `Executor`. If your code depends on which thread calls `submit`, the “bad abstraction” is on your side, not the `CompletableFuture`. – Holger Aug 22 '23 at 09:32
  • I do use a dedicated Executor in both `supply` and `apply`, but the decoration is inconsistent. And I wouldn't say that it's >my< abstraction that's wrong. Did you look at the link? Or perhaps a Spring link would be better: https://spring.io/blog/2023/03/28/context-propagation-with-project-reactor-1-the-basics Everyone uses these decorators, but no one warns about the errors you can encounter when combining that with CompletableFuture. And I'm not saying that CF are bad, just from the API user perspective I would expect more control and better, explicit guarantees. – nluk Aug 22 '23 at 10:07