13

I have been working on Java CompletableFuture lately and found , we should always use customized Threadpool. With it, I found two ways of passing threadpool to existing code. Like below

This is my ThreadPool in configuration file

@Override
@Bean(name = "commonThreadPool")
public Executor getAsyncExecutor() {
  return new ThreadPoolTaskExecutor();
}

1. Passing existingThreadPool in argument.

 @Autowired
 @Qualifier("commonThreadPool") 
 TaskExecutor existingThreadPool;       
 CompletableFuture.runAsync(() -> executeTask(),existingThreadPool);

2. Using async like below

@Async("commonThreadPool")
public void executeTask() {
// Execute Some Task
}

is there any third way where I can write CompletableFuture Handler or Override its existing behaviour at single place where I can pass custom Threadpool. And after that wherever I use below code, it should pick my existing ThreadPool instead of forkJoin pool.

 CompletableFuture.runAsync(() -> executeTask());
Mayur
  • 864
  • 6
  • 14
  • 25

2 Answers2

9

There is no standard way for replacing the default executor for all CompletableFuture instances. But since Java 9, you can define a default executor for subclasses. E.g. with

public class MyCompletableFuture<T> extends CompletableFuture<T> {
    static final Executor EXEC = r -> {
        System.out.println("executing "+r);
        new Thread(r).start();
    };

    @Override
    public Executor defaultExecutor() {
        return EXEC;
    }

    @Override
    public <U> CompletableFuture<U> newIncompleteFuture() {
        return new MyCompletableFuture<>();
    }

    public static CompletableFuture<Void> runAsync​(Runnable runnable) {
        Objects.requireNonNull(runnable);
        return supplyAsync(() -> {
            runnable.run();
            return null;
        });
    }

    public static <U> CompletableFuture<U> supplyAsync​(Supplier<U> supplier) {
        return new MyCompletableFuture<U>().completeAsync(supplier);
    }
}

you did all necessary steps for defining the default executor for all chained stages of the MyCompletableFuture. The executor hold in EXEC only serves as an example, producing a printout when used, so when you use that example class like

MyCompletableFuture.supplyAsync(() -> "test")
    .thenApplyAsync(String::toUpperCase)
    .thenAcceptAsync(System.out::println);

it will print

executing java.util.concurrent.CompletableFuture$AsyncSupply@65ab7765
executing java.util.concurrent.CompletableFuture$UniApply@119d7047
executing java.util.concurrent.CompletableFuture$UniAccept@404b9385
TEST
Holger
  • 285,553
  • 42
  • 434
  • 765
1

I would strongly recommend against it but if you really want to you can use reflection to change the thread pool used by completable future.

public static void main(String[] args) throws Exception {
    // Prints ForkJoinPool.commonPool-worker-1
    CompletableFuture<Void> c = CompletableFuture.runAsync(() -> System.out.println(Thread.currentThread().getName()));
    c.get();

    setFinalStatic(CompletableFuture.class.getDeclaredField("asyncPool"), Executors.newFixedThreadPool(10));

    // Prints pool-1-thread-1
    c = CompletableFuture.runAsync(() -> System.out.println(Thread.currentThread().getName()));
    c.get();
}

static void setFinalStatic(Field field, Object newValue) throws Exception {
    field.setAccessible(true);
    Field modifiersField = Field.class.getDeclaredField("modifiers");
    modifiersField.setAccessible(true);
    modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
    field.set(null, newValue);
}

setFinalStatic is taken from https://stackoverflow.com/a/3301720/1398418

Oleg
  • 6,124
  • 2
  • 23
  • 40
  • Thanks @Oleg but, I am looking for any existing Spring implemented thing or Spring has given any option us to do it ? – Mayur May 19 '19 at 10:40
  • What do you mean by "Spring implemented thing"? Spring has `@Async`. When you do `CompletableFuture.runAsync(() -> executeTask());` you're just calling a method of a java standard library class, it has nothing to do with Spring. – Oleg May 19 '19 at 10:54
  • I mean to say any Spring existing Implemented Configuration where we can pass existing ThreadPool one time during app setting and it will get used internally in every CompletableFuture Instead of CommonForkJoin. – Mayur May 19 '19 at 11:03
  • Then just do what I suggested during application startup. – Oleg May 19 '19 at 11:45
  • 1
    You can also probably achieve your goal with [aop](https://docs.spring.io/spring/docs/current/spring-framework-reference/core.html#aop). You will need [aspectj](https://docs.spring.io/spring/docs/current/spring-framework-reference/core.html#aop-using-aspectj), spring aop is not enough. – Oleg May 19 '19 at 12:16