I have overridden the execute
method for java.util.concurrent.Executor
in ThreadPoolExecutor
implementation. The new implementation just decorates the runnable and then calls the original execute. The issue I'm having is that if I have two such executors, then following:
supplyAsync(() -> foo(), firstExecutor).thenApplyAsync(firstResult -> bar(), secondExecutor)
translates to two execute
calls. Usually they are executed by main
and firstExecutor
, but sometimes it's main
two times.
Does it depend on how long it takes to complete the Suppplier in supplyAsync?
Here's a minimal reproducible example (10k repeats, for me it fails about 3 times java.lang.AssertionError: Unexpected second decorator: main
):
package com.foo;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.RepeatedTest;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
class DecorationTest {
record WhoCalled(String decorator, String runnable) {}
static class DecoratedExecutor extends ThreadPoolExecutor{
private final List<WhoCalled> callers;
public DecoratedExecutor(List<WhoCalled> callers, String threadName) {
super(1, 1, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>(), runnable -> new Thread(runnable, threadName));
this.callers = callers;
}
@Override
public void execute(final Runnable command) {
String decoratingThread = Thread.currentThread().getName();
Runnable decorated = () -> {
String runningThread = Thread.currentThread().getName();
callers.add(new WhoCalled(decoratingThread, runningThread));
command.run();
};
super.execute(decorated);
}
}
List<WhoCalled> callers;
ExecutorService firstExecutor;
ExecutorService secondExecutor;
@BeforeEach
void beforeEach() {
callers = new ArrayList<>();
firstExecutor = new DecoratedExecutor(callers, "firstExecutor");
secondExecutor = new DecoratedExecutor(callers, "secondExecutor");
}
@AfterEach
void afterEach() {
firstExecutor.shutdown();
secondExecutor.shutdown();
}
@RepeatedTest(10_000)
void testWhoCalled() throws Exception {
Integer result = CompletableFuture.supplyAsync(() -> 1, firstExecutor)
.thenApplyAsync(supplyResult -> supplyResult, secondExecutor)
.get();
assert result == 1;
WhoCalled firstCallers = callers.get(0);
assert firstCallers.decorator().equals("main");
assert firstCallers.runnable().equals("firstExecutor");
WhoCalled secondCallers = callers.get(1);
assert secondCallers.decorator().equals("firstExecutor") : "Unexpected second decorator: " + secondCallers.decorator;
assert secondCallers.runnable().equals("secondExecutor");
}
}