3

I expose a method in a library, which returns a CompletableFuture. That method's computation happens on a single-threaded Executor which is my bottleneck, hence I do not want any subsequent work to happen on the same thread.

If I use the simple approach of returning the result of "supplyAsync", I'll be exposing my precious thread to the callers, who may be adding synchronous operations (e.g. via thenAccept) which could take some CPU time on that thread.

Repro below:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CfPlayground {
    private ExecutorService preciousExecService = Executors.newFixedThreadPool(1);

    CfPlayground() {}

    private static void log(String msg) {
        System.out.println("[" + Thread.currentThread().getName() + "] " + msg);
    }

    CompletableFuture<String> asyncOp(String param) {
        return CompletableFuture.supplyAsync(() -> {
            log("In asyncOp");
            return "Hello " + param;
        }, preciousExecService);
    }

    void syncOp(String salutation) {
        log("In syncOp: " + salutation);
    }

    void run() {
        log("run");
        asyncOp("world").thenAccept(this::syncOp);
    }

    public static void main(String[] args) throws InterruptedException {
        CfPlayground compFuture = new CfPlayground();
        compFuture.run();
        Thread.sleep(500);
        compFuture.preciousExecService.shutdown();
    }
}

This indeed prints:

[main] run
[pool-1-thread-1] In asyncOp
[pool-1-thread-1] In syncOp: Hello world

One solution I found was to introduce another Executor, and add a no-op thenApplyAsync with that executor before returning the CompletableFuture

    CompletableFuture<String> asyncOp(String param) {
        return CompletableFuture.supplyAsync(() -> {
            log("In asyncOp");
            return "Hello " + param;
        }, preciousExecService).thenApplyAsync(s -> s, secondExecService);
    }

This works, but doesn't feel super elegant - is there a better way to do this?

C-B-B
  • 33
  • 4
  • Your solution seems like the best one. Instead of creating a second ExecutorService, you could pass [ForkJoinPool.commonPool()](https://docs.oracle.com/en/java/javase/12/docs/api/java.base/java/util/concurrent/ForkJoinPool.html#commonPool%28%29), which is what CompletableFutures use by default if no explicit Executor is given. (I’m not entirely clear on whether methods with no explicit Executor argument always use the commonPool, or always use the current CompletableFuture’s Executor.) – VGR Jun 09 '19 at 18:34
  • Thanks for the suggestion @VGR, that could work instead of the seond executor. For completeness, my understanding is that async callbacks without explicit executors will run on the ForkJoinPool.commonPool(), and non-async ones will run either on the calling thread if the CompletableFuture completed immediately, or on the thread that completed the request (e.g. my single-threaded executor) otherwise. There's a more accurate and complete explanation [here](https://stackoverflow.com/questions/46060438/in-which-thread-does-completablefutures-completion-handlers-execute-in/46062939#46062939) – C-B-B Jun 10 '19 at 19:03

2 Answers2

2

There is no feature to detach your completion from the execution of the dependent action. When the thread chaining the dependent action has already completed the registration and your executor’s thread completes the future, which other thread ought to execute the dependent action if no other executor was given?

Your approach of chaining another action with a different executor seems to be the best you can get. However, it’s important to note that in case of an exceptional completion, the exception gets propagated without evaluating functions passed to thenApply. This exception propagation could again lead to an exposure of the thread, if the caller chained an action like whenComplete, handle, or exceptionally.

On the other hand, you don’t need to specify a secondary executor, as you can use the async method without executor parameter, to get the default (common Fork/Join) pool.

So chaining .whenCompleteAsync((x,y) -> {}) is the best solution to your problem so far.

Holger
  • 285,553
  • 42
  • 434
  • 765
  • Thanks for your reply @Holger, and good point on the exception propagation implications! – C-B-B Jun 11 '19 at 22:06
0

You could just change the method signature to return a Future instead of a CompletableFuture:

Future<String> asyncOp(String param) {
    return CompletableFuture.supplyAsync(() -> {
        log("In asyncOp");
        return "Hello " + param;
    }, preciousExecService);
}

That way, the run() method would throw a compilation error:

void run() {
    log("run");
    asyncOp("world").thenAccept(this::syncOp);
}

The caller would still be able to cast the returned Future back to a CompletableFuture, but that would be quite a misuse of your API and it cannot happen by accident.

Dorian Gray
  • 2,913
  • 1
  • 9
  • 25
  • Thanks for the response @Dorian. Agreed returning a Future would prevent callers "stealing" the internal executor, but I'm trying to use CompletableFuture to let callers use Java-8-style callbacks rather than have to wait or block on a Future. Makes sense? Do let me know if I'm missing something here. – C-B-B Jun 10 '19 at 18:48
  • Well, I didn't understand from your posting that you wanted to do so. In that case, my solution won't work. – Dorian Gray Jun 11 '19 at 16:48