I am working with the CompletableFuture for async execution of a stream generated from a list source.
so i am testing the overloaded method i.e. "supplyAsync" of CompletableFuture in which one method takes only single supplier parameter and other takes a supplier parameter and an executor parameter. Here is the documentation for both:
one
supplyAsync(Supplier supplier)
Returns a new CompletableFuture that is asynchronously completed by a task running in the ForkJoinPool.commonPool() with the value obtained by calling the given Supplier.
second
supplyAsync(Supplier supplier, Executor executor)
Returns a new CompletableFuture that is asynchronously completed by a task running in the given executor with the value obtained by calling the given Supplier.
And here is my test class:
public class TestCompleteableAndParallelStream {
public static void main(String[] args) {
List<MyTask> tasks = IntStream.range(0, 10)
.mapToObj(i -> new MyTask(1))
.collect(Collectors.toList());
useCompletableFuture(tasks);
useCompletableFutureWithExecutor(tasks);
}
public static void useCompletableFutureWithExecutor(List<MyTask> tasks) {
long start = System.nanoTime();
ExecutorService executor = Executors.newFixedThreadPool(Math.min(tasks.size(), 10));
List<CompletableFuture<Integer>> futures =
tasks.stream()
.map(t -> CompletableFuture.supplyAsync(() -> t.calculate(), executor))
.collect(Collectors.toList());
List<Integer> result =
futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
long duration = (System.nanoTime() - start) / 1_000_000;
System.out.printf("Processed %d tasks in %d millis\n", tasks.size(), duration);
System.out.println(result);
executor.shutdown();
}
public static void useCompletableFuture(List<MyTask> tasks) {
long start = System.nanoTime();
List<CompletableFuture<Integer>> futures =
tasks.stream()
.map(t -> CompletableFuture.supplyAsync(() -> t.calculate()))
.collect(Collectors.toList());
List<Integer> result =
futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
long duration = (System.nanoTime() - start) / 1_000_000;
System.out.printf("Processed %d tasks in %d millis\n", tasks.size(), duration);
System.out.println(result);
}
}
class MyTask {
private final int duration;
public MyTask(int duration) {
this.duration = duration;
}
public int calculate() {
System.out.println(Thread.currentThread().getName());
try {
Thread.sleep(duration * 1000);
} catch (final InterruptedException e) {
throw new RuntimeException(e);
}
return duration;
}
}
while the "useCompletableFuture" method takes around 4 seconds to complete, "useCompletableFutureWithExecutor" method takes only 1 second to complete.
No my question is, What different processing does the ForkJoinPool.commonPool() which could do the overhead? In that shouldn't we always prefer the custom executor pool over ForkJoinPool?