How can I add futures by CompletableFuture.allOf()
when the futures are created within a for-loop? I want to create a bunch of futures which should be executed in parallel. And only when all futures are completed the method should return the result:
// Version 1: execute each task in async and return alls tasks when finished
public Set<Task> getTasks(){
var executor = Executors.newCachedThreadPool();
var tasks = new LinkedHashSet<Task>();
var futures = new ArrayList<CompletableFuture<Set<Task>>>();
for (var task : user.getTasks()) {
// all futures are executed in parallel
futures.add(CompletableFuture.supplyAsync(() -> execute(task), executor));
}
for (var f : futures) {
// this will block as long as each future is finished
tasks.addAll(f.join());
}
return tasks;
}
Or is there another alternative? I have also tried the following, but it also executes the futures one after another (instead of parallel):
// Version 2:
var executor = Executors.newCachedThreadPool();
var tasks = new LinkedHashSet<Task>();
for (var task : user.getTasks()) {
CompletableFuture.supplyAsync(() -> execute(task), executor)
.thenAccept(tasks::addAll).join();
}
EDIT: at the end I have two versions which come close the problem I would like to solve. However, I guess version A is not right because parallel threads will add elements to the LinkedHashSet
in async mode (which could cause trouble, because LinkedHashSet
is not thread safe):
VERSION A (it seems not thread safe):
var executor = Executors.newCachedThreadPool();
var tasks = new LinkedHashSet<Task>();
var futures = new ArrayList<CompletableFuture<Void>>();
for (var t : user.getTasks()) {
futures.add(CompletableFuture.supplyAsync(() -> execute(t), executor).thenAcceptAsync(tasks::addAll));
}
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
and VERSION B (which could be better, but is a little complex):
var executor = Executors.newCachedThreadPool();
var futures = new ArrayList<CompletableFuture<Set<Task>>>();
for (var t : user.getTasks()) {
futures.add(CompletableFuture.supplyAsync(() -> execute(t), executor));
}
Set<Task> o = CompletableFuture
.allOf(futures.toArray(new CompletableFuture[0]))
.thenApplyAsync(v -> futures.stream().flatMap(future -> future.join().stream()))
.join().collect(Collectors.toSet());
I cannot find an easier approach..but for completness, I add the following code which is the shortest - however, it uses ForkJoinPool
which should be avoided (?) for long running tasks:
// VERSION C: execute in parallel without suffering from CompletableApi:
return user.getTasks()
.parallelStream()
.flatMap(t -> execute(t).stream())
.collect(Collectors.toSet());