1

How can I add futures by CompletableFuture.allOf() when the futures are created within a for-loop? I want to create a bunch of futures which should be executed in parallel. And only when all futures are completed the method should return the result:

// Version 1: execute each task in async and return alls tasks when finished
public Set<Task> getTasks(){
    var executor = Executors.newCachedThreadPool();
    var tasks = new LinkedHashSet<Task>();
    var futures = new ArrayList<CompletableFuture<Set<Task>>>();
    for (var task : user.getTasks()) {
        // all futures are executed in parallel
        futures.add(CompletableFuture.supplyAsync(() -> execute(task), executor));
    }
    for (var f : futures) {
        // this will block as long as each future is finished
        tasks.addAll(f.join());
    }

    return tasks;
}

Or is there another alternative? I have also tried the following, but it also executes the futures one after another (instead of parallel):

// Version 2:
var executor = Executors.newCachedThreadPool();
var tasks = new LinkedHashSet<Task>();
for (var task : user.getTasks()) {
    CompletableFuture.supplyAsync(() -> execute(task), executor)
                     .thenAccept(tasks::addAll).join();
}

EDIT: at the end I have two versions which come close the problem I would like to solve. However, I guess version A is not right because parallel threads will add elements to the LinkedHashSet in async mode (which could cause trouble, because LinkedHashSet is not thread safe):

VERSION A (it seems not thread safe):

var executor = Executors.newCachedThreadPool();
var tasks = new LinkedHashSet<Task>();
var futures = new ArrayList<CompletableFuture<Void>>();
for (var t : user.getTasks()) {
    futures.add(CompletableFuture.supplyAsync(() -> execute(t), executor).thenAcceptAsync(tasks::addAll));
}
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();

and VERSION B (which could be better, but is a little complex):

var executor = Executors.newCachedThreadPool();
var futures = new ArrayList<CompletableFuture<Set<Task>>>();
for (var t : user.getTasks()) {
    futures.add(CompletableFuture.supplyAsync(() -> execute(t), executor));
}
Set<Task> o = CompletableFuture
                 .allOf(futures.toArray(new CompletableFuture[0]))
                 .thenApplyAsync(v -> futures.stream().flatMap(future -> future.join().stream()))
                 .join().collect(Collectors.toSet());

I cannot find an easier approach..but for completness, I add the following code which is the shortest - however, it uses ForkJoinPool which should be avoided (?) for long running tasks:

        // VERSION C: execute in parallel without suffering from CompletableApi:
        return user.getTasks()
                .parallelStream()
                .flatMap(t -> execute(t).stream())
                .collect(Collectors.toSet());
nimo23
  • 5,170
  • 10
  • 46
  • 75
  • Are your tasks fast to complete? If I understand `CompletableFuture` correctly, they are executed immediately unless a delay is provided. – Scratte Feb 19 '21 at 18:13
  • Accord. to the answer below version 1 works as expected. However, I cannot say that for version 2. Maybe I am wrong and version 2 works like version 1? – nimo23 Feb 19 '21 at 18:20
  • 1
    You could just test it with printing out `System.nanoTime()` inside your `execute()`, no? – Scratte Feb 19 '21 at 19:00
  • version 2 is not right because it will block until one future completes. – nimo23 Feb 19 '21 at 19:25
  • I see now. You're saying the loop doesn't progress due to the `join()`. Which is also true for version 1 in your example, but at the point of the loop, the `CompletableFuture` have all been started. – Scratte Feb 19 '21 at 19:34
  • this is a bit weird, your `execute(task)` returns a `Set`? like from one task you get multiple other tasks? – Eugene Feb 19 '21 at 21:21
  • @Eugene yes, each task returns a `Set`, which then I do an `addAll(tasks)` to the main list. I made it clear with the code above `parallelStream().flatMap()`. – nimo23 Feb 19 '21 at 21:53

4 Answers4

1

Your code should work as it is. That is, the for loop in your first example waits for the first future to complete before proceeding to the second future, but in the meantime all the other futures are concurrently running. They typically start to execute as soon as you've called supplyAsync. To prove this, here's a self-contained executable:

import java.time.LocalTime;
import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class Demo {
    public static void main(String[] args) throws InterruptedException {
        var executor = Executors.newCachedThreadPool();
        var results = new ArrayList<String>();
        var futures = new ArrayList<CompletableFuture<String>>();
        futures.add(CompletableFuture.supplyAsync(() -> sleep(2), executor));
        TimeUnit.MILLISECONDS.sleep(100);
        futures.add(CompletableFuture.supplyAsync(() -> sleep(1), executor));

        // All futures are executed in parallel
        for (var f : futures) {
            results.add(f.join());
        }

        results.forEach(System.out::println);
    }

    private static String sleep(int seconds) {
        var start = LocalTime.now();
        try {
            TimeUnit.SECONDS.sleep(seconds);
        } catch (InterruptedException ignored) {
            Thread.currentThread().interrupt();
        }
        var end = LocalTime.now();
        return String.format("Thread %s started at %s and finished at %s",
                Thread.currentThread().getId(), start, end);
    }
}

The output proves that the second future finished before the first, as expected:

Thread 14 started at 17:49:35.202673531 and finished at 17:49:37.206196631
Thread 15 started at 17:49:35.262183490 and finished at 17:49:36.262342704
k314159
  • 5,051
  • 10
  • 32
  • Do you mean the first or the second version? Or behave both versions the same? Should I use `CompletableFuture.allOf()` for this use case at all? – nimo23 Feb 19 '21 at 18:01
1

CompletableFuture.allOf() is pretty simple here when using Stream API:

CompletableFuture.allOf(user.getTasks().stream()
            .map(task -> CompletableFuture.supplyAsync(() -> execute(task), executor))
            .toArray(CompletableFuture[]::new))
            .join();
Nikolai Shevchenko
  • 7,083
  • 8
  • 33
  • 42
  • 1
    The `execute(task)` returns a set of tasks which should be stored within `tasks` (look at my version). With your solution, I cannot get any result. – nimo23 Feb 19 '21 at 18:26
  • 2
    Ok, your solution works when using `...map(task -> CompletableFuture.supplyAsync(() -> execute(task), executor)).thenAccept(tasks::addAll)).toArray(CompletableFuture[]::new)).join();` – nimo23 Feb 19 '21 at 18:32
  • But the question still exist. Is **version 2** also correct? If so, I want to prefer that over adding unneccessary overhead by streams-api. – nimo23 Feb 19 '21 at 18:36
  • @nimo23 `thenAccept(tasks::addAll)` will be executed by multiple threads and relies on side-effects, this is a terrible solution. – Eugene Feb 19 '21 at 21:31
  • @Eugene yes, I know that's the reason for a new question, I currently prepare it.. – nimo23 Feb 19 '21 at 21:32
  • 1
    @nimo23 don't. the question is rather clear here already... just that this is not the proper answer. – Eugene Feb 19 '21 at 21:33
1

Of course your second variant will execute one after another:

CompletableFuture.supplyAsync(() -> execute(task), executor)
                 .thenAccept(tasks::addAll)
                 .join();

You join that, blocking the thread.

The second problem is the use of newCachedThreadPool. I'll explain that based on the jdk's HttpClient. In the early version it had that in the documentation that it will use a cached pool, later they removed it from the documentation. Currently it is left in the implementation, but that will be removed also, in time. The problem is that such a pool, when you use it incorrectly, will eat all your resources and kill your application. No more free threads? Sure, I will create a new one, and so on... Eventually this will hurt you. Use a pool with a limited numbers of threads.


To answer your question, you are looking for some kind of flatMap, that could do CompletableFuture<Set<X>> to Set<CompletableFuture<X>>. Such a non-blocking method does not exist. You need to call join, but you can delay the call to that join via a trick:

user.getTasks().stream()
               .map(each -> CompletableFuture.supplyAsync(() -> execute(each), executor))
               .flatMap(x -> Stream.of(x).map(CompletableFuture::join))
               .flatMap(Set::stream)
               .collect(Collectors.toSet());
Eugene
  • 117,005
  • 15
  • 201
  • 306
  • I get a compiler error when using your code: `Cannot infer type argument(s) for flatMap(Function super T,? extends Stream extends R>>)` – nimo23 Feb 19 '21 at 22:47
  • Ok, the code compiles by adding type information `Stream.of(x).map(CompletableFuture>::join)` – nimo23 Feb 19 '21 at 22:58
  • Your code does not use `CompletableFuture.allOf()`. I am unsure if this is the proper way. According to https://stackoverflow.com/questions/35809827/java-8-completablefuture-allof-with-collection-or-list, with `CompletableFuture::join`, we are waiting for all futures to be completed. By using `CompletableFuture.allOf()`, it just returns a new future which will get completed once all contained futures are completed. I think, with your approach: If one of your futures raises an exception, your code will not wait for completion of the other futures and kills the other tasks. – nimo23 Feb 19 '21 at 23:10
  • I tried your code and if one task raises an exception, then all tasks are silently stopped. – nimo23 Feb 20 '21 at 00:31
  • I will not use your version as I don't see any benefits in compare to my solution (version X). I posted it as an answer below. Feel free to comment any doubts. Thanks. – nimo23 Feb 20 '21 at 00:52
1

After trying all those versions above I come to the conclustion that the following solution is the best:

// VERSION X is the best
public Set<Task> getTasks(){
    var executor = Executors.newCachedThreadPool();
    var futures = new ArrayList<Future<Set<Task>>>();
    var tasks = new LinkedHashSet<Task>();
    for (var t : user.getTasks()) {
        futures.add(executor.submit(() -> executor(t)));
    }
    for (var f : futures) {
        try {
            tasks.addAll(f.get());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
return tasks;
}

}

It's the best because:

  • easy and fast code (no unneeded overhead, lambdas, completableFuture,..)
  • no exception is surpressed
  • does not stop the execution of further tasks if one task raises an exception

If anyone can convince me to use other versions, then please add arguments.

nimo23
  • 5,170
  • 10
  • 46
  • 75
  • 1
    This isnt about "add arguments". If you would have made a clear question to begin with, with clear requirments, this would have been easy to answer. Since you did not, like with the case "if a task fails", you are were you are now. If you happy with this solution, use it. – Eugene Feb 20 '21 at 00:58
  • Yes, it was unclear for me that completableFutures behaves so differently in different cases. For example, surpressing Exceptions by default and stopping the queue, .. Anyway, thanks for helping. – nimo23 Feb 20 '21 at 01:03
  • no worries, at all. Next time a better crafted question will trigger better insights. – Eugene Feb 20 '21 at 01:07