93

I am trying to convert List<CompletableFuture<X>> to CompletableFuture<List<T>>. This is quite useful as when you have many asynchronous tasks and you need to get results of all of them.

If any of them fails then the final future fails. This is how I have implemented:

public static <T> CompletableFuture<List<T>> sequence2(List<CompletableFuture<T>> com, ExecutorService exec) {
    if(com.isEmpty()){
        throw new IllegalArgumentException();
    }
    Stream<? extends CompletableFuture<T>> stream = com.stream();
    CompletableFuture<List<T>> init = CompletableFuture.completedFuture(new ArrayList<T>());
    return stream.reduce(init, (ls, fut) -> ls.thenComposeAsync(x -> fut.thenApplyAsync(y -> {
        x.add(y);
        return x;
    },exec),exec), (a, b) -> a.thenCombineAsync(b,(ls1,ls2)-> {
        ls1.addAll(ls2);
        return ls1;
    },exec));
}

To run it:

ExecutorService executorService = Executors.newCachedThreadPool();
Stream<CompletableFuture<Integer>> que = IntStream.range(0,100000).boxed().map(x -> CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep((long) (Math.random() * 10));
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return x;
}, executorService));
CompletableFuture<List<Integer>> sequence = sequence2(que.collect(Collectors.toList()), executorService);

If any of them fails then it fails. It gives output as expected even if there are a million futures. The problem I have is: Say if there are more than 5000 futures and if any of them fails, I get a StackOverflowError:

Exception in thread "pool-1-thread-2611" java.lang.StackOverflowError at java.util.concurrent.CompletableFuture.internalComplete(CompletableFuture.java:210) at java.util.concurrent.CompletableFuture$ThenCompose.run(CompletableFuture.java:1487) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:193) at java.util.concurrent.CompletableFuture.internalComplete(CompletableFuture.java:210) at java.util.concurrent.CompletableFuture$ThenCompose.run(CompletableFuture.java:1487)

What am I doing it wrong?

Note: The above returned future fails right when any of the future fails. The accepted answer should also take this point.

Didier L
  • 18,905
  • 10
  • 61
  • 103
Jatin
  • 31,116
  • 15
  • 98
  • 163
  • 1
    If I were you I'd implement a `Collector` instead... – fge May 04 '15 at 08:13
  • @fge That is actually a very good suggestion. I am coming from scala world where we have a similar thing. Collector might be a better fit here. But then the implementation i suppose might be similar. – Jatin May 04 '15 at 08:19

9 Answers9

108

Use CompletableFuture.allOf(...):

static<T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> com) {
    return CompletableFuture.allOf(com.toArray(new CompletableFuture<?>[0]))
            .thenApply(v -> com.stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList())
            );
}

A few comments on your implementation:

Your use of .thenComposeAsync, .thenApplyAsync and .thenCombineAsync is likely not doing what you expect. These ...Async methods run the function supplied to them in a separate thread. So, in your case, you are causing the addition of the new item to the list to run in the supplied executor. There is no need to stuff light-weight operations into a cached thread executor. Do not use thenXXXXAsync methods without a good reason.

Additionally, reduce should not be used to accumulate into mutable containers. Even though it might work correctly when the stream is sequential, it will fail if the stream were to be made parallel. To perform mutable reduction, use .collect instead.

If you want to complete the entire computation exceptionally immediately after the first failure, do the following in your sequence method:

CompletableFuture<List<T>> result = CompletableFuture.allOf(com.toArray(new CompletableFuture<?>[0]))
        .thenApply(v -> com.stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList())
        );

com.forEach(f -> f.whenComplete((t, ex) -> {
    if (ex != null) {
        result.completeExceptionally(ex);
    }
}));

return result;

If, additionally, you want to cancel the remaining operations on first failure, add exec.shutdownNow(); right after result.completeExceptionally(ex);. This, of course, assumes that exec only exist for this one computation. If it doesn't, you'll have to loop over and cancel each remaining Future individually.

Misha
  • 27,433
  • 6
  • 62
  • 78
  • 1
    One thing that I dont understand is, `allof` return type is `CompletableFuture` and we return `CompletableFuture>` without any compiler warning. I was not aware of this nature of void – Jatin May 04 '15 at 10:10
  • In my case it is perfectly fine to use ArrayList because the addition operation cannot be performed in parallel. Only when the previous one is completed we add an element, and so on. So the addition will never be performed in parallel. – Jatin May 04 '15 at 10:11
  • See the `.thenApply` part. After `allOf(...)` completes successfully, it collects all the resulting values into a list. – Misha May 04 '15 at 10:13
  • I agree with most of the answer except the `ArrayList` part. it is completely safe to use it in my code. Else this is a better code than mine (just that you iterate over the list multiple times. which is ok given the verbosity lost). – Jatin May 04 '15 at 10:24
  • 1
    @Jatin I think you might be right about that. I will rethink it in the morning when I'm more awake and modify my answer accordingly. – Misha May 04 '15 at 10:31
  • 1
    @Jatin You are right, within the current implementation of `reduce`, so long as the stream in the `sequence2` method is kept sequential, ArrayList is safe. However, it is very undesirable to write stream constructs that break if stream were made parallel. In the very least, if you rely on the stream being sequential, the 3rd argument to `reduce` should be `(a, b) -> {throw new IllegalStateException("Parallel not allowed");}` – Misha May 04 '15 at 22:20
  • A good point! Thanks. I think sequence2 was a bad idea. A collector is certainly better and so is your answer. – Jatin May 05 '15 at 08:39
  • There is one flaw here. `CompletableFuture.allOf` only completes when all complete. Even if any amongst them fails, it still waits for others for response. – Jatin May 15 '15 at 10:39
  • 1
    That is exactly how your original solution (using `thenCombine`) would behave. If you want to short-circuit the computation and trigger exceptional completion immediately, it's easy to do. See updated answer. – Misha May 16 '15 at 22:10
  • For completion :-P i think this code needs a import static java.util.stream.Collectors.toList – jneira Nov 24 '15 at 10:57
  • This does not compile: `Type mismatch: cannot convert from CompletableFuture to CompletableFuture>`. JDK 8u66 – Dirk Hillbrecht Jan 10 '16 at 17:31
  • @Misha I don't understand the benefit of using `allOf` in your code. The individual tasks are invoked sequentially using `join`, so task `i + 1` is not invoked until task `i` is complete. I didn't find anything in the documentation that allows for invoking all the subtasks parallely. The closest thing seems to be `ForkJoinPool .invokeAll` that takes a bunch of `Callable`. – Abhijit Sarkar Jan 18 '16 at 05:45
  • 2
    @AbhijitSarkar The tasks aren't invoked by `join`. The benefit of using `allOf` is that when `allOf` triggers, all the tasks have been completed and `join` just gets the results. – Misha Jan 19 '16 at 20:11
  • 1
    @DirkHillbrecht are you using eclipse? – Misha Jan 19 '16 at 20:11
  • @Misha Could you elaborate on when are the individual tasks invoked and how is the max concurrency controlled? If I submit hundreds of tasks using `allOf`, there's no way all of them could be invoked concurrently as the system just wouldn't have enough resources for that, besides completely defeating the purpose of thread pooling. – Abhijit Sarkar Jan 20 '16 at 01:06
  • @AbhijitSarkar That is out of the scope of this method. The assignment to thread pools is done when individual `CompletableFuture`s were created. It's entirely possible that all the tasks were scheduled in a single thread and the whole thing will run sequentially. It's also possible that they were scheduled in a giant thread pool and everything will run concurrently. Either way, `allOf` will ensure that all tasks have completed before it triggers. – Misha Jan 20 '16 at 01:55
  • @Misha, Yes, Eclipse Mars.1 using JDK 8u66. – Dirk Hillbrecht Jan 20 '16 at 07:00
  • @Misha If we provide this function with a list of completable futures backed by a thread (created with `supplyAsync`), and one of the futures fail, then what happens to the others? Are they abandoned as "zombie threads"? – cubuspl42 Aug 22 '17 at 11:22
  • Wouldn't a direct `CompletableFuture.supplyAsync(() -> com.parallelStream().map(CompletableFuture::join).collect(toList), exec)` be just enough? – charlie Feb 13 '18 at 13:45
  • I know its a bit old thread. I have tried to execute the mentioned solution in my windows eclipse environment. However, every time I execute, it exit on the first occurence of exception. Is it because of `result.completeExceptionally(ex)` get executed and kills all the remaining threads.If yes, what can i do to ensure remaining of the threads get executed as well. – NIGAGA Aug 22 '19 at 17:39
  • 1
    @NIGAGA The version of the code with `result.completeExceptionally` is specifically meant to complete after the first failure (see the description before the code listing). The first version with just `allOf` will wait for all tasks to complete (successfully or not) before completing the future. If this doesn't help you, please post a new question and include your code. – Misha Aug 22 '19 at 19:15
  • @Misha Thanks I understand now. – NIGAGA Aug 22 '19 at 20:02
  • Recommended: [Arrays of Wisdom of the Ancients](https://shipilev.net/blog/2016/arrays-wisdom-ancients/). In short, use `toArray(new CompletableFuture>[0])`. It’s simpler and less error prone *and* more efficient on the most widespread JVM implementation… – Holger Nov 29 '19 at 14:28
12

You can get Spotify's CompletableFutures library and use allAsList method. I think it's inspired from Guava's Futures.allAsList method.

public static <T> CompletableFuture<List<T>> allAsList(
    List<? extends CompletionStage<? extends T>> stages) {

And here is a simple implementation if you don't want to use a library:

public <T> CompletableFuture<List<T>> allAsList(final List<CompletableFuture<T>> futures) {
    return CompletableFuture.allOf(
        futures.toArray(new CompletableFuture[futures.size()])
    ).thenApply(ignored ->
        futures.stream().map(CompletableFuture::join).collect(Collectors.toList())
    );
}
oskansavli
  • 313
  • 3
  • 6
11

As Misha has pointed out, you are overusing …Async operations. Further, you are composing a complex chain of operations modelling a dependency which doesn’t reflect your program logic:

  • you create a job x which depends on the first and second job of your list
  • you create a job x+1 which depends on job x and the third job of your list
  • you create a job x+2 which depends on job x+1 and the 4th job of your list
  • you create a job x+5000 which depends on job x+4999 and the last job of your list

Then, canceling (explicitly or due to an exception) this recursively composed job might be performed recursively and might fail with a StackOverflowError. That’s implementation-dependent.

As already shown by Misha, there is a method, allOf which allows you to model your original intention, to define one job which depends on all jobs of your list.

However, it’s worth noting that even that isn’t necessary. Since you are using an unbounded thread pool executor, you can simply post an asynchronous job collecting the results into a list and you are done. Waiting for the completion is implied by asking for the result of each job anyway.

ExecutorService executorService = Executors.newCachedThreadPool();
List<CompletableFuture<Integer>> que = IntStream.range(0, 100000)
  .mapToObj(x -> CompletableFuture.supplyAsync(() -> {
    LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos((long)(Math.random()*10)));
    return x;
}, executorService)).collect(Collectors.toList());
CompletableFuture<List<Integer>> sequence = CompletableFuture.supplyAsync(
    () -> que.stream().map(CompletableFuture::join).collect(Collectors.toList()),
    executorService);

Using methods for composing dependent operations are important, when the number of threads is limited and the jobs may spawn additional asynchronous jobs, to avoid having waiting jobs stealing threads from jobs which have to complete first, but neither is the case here.

In this specific case one job simply iterating over this large number of prerequisite jobs and waiting if necessary may be more efficient than modelling this large number of dependencies and having each job to notify the dependent job about the completion.

Community
  • 1
  • 1
Holger
  • 285,553
  • 42
  • 434
  • 765
  • 2
    One caveat is that using `supplyAsync` instead of `allOf` will consume a thread from the pool to await the completion of all tasks. If I'm not mistaken, `allOf` will operate within the threads assigned to respective tasks. Not a big deal for most use cases, but worth noting. – Misha May 04 '15 at 21:10
  • 1
    @Misha: I *did* mention that it will steal a thread if the number of threads is limited and that it works here because an unlimited thread pool executor is used (and no async sub-jobs are spawned). – Holger May 05 '15 at 08:50
  • @Holger A problem with this answer is that: If any of the later future fails, it still waits for one it is joined upon to complete. Rather, as soon as something gets failed, the returned future should be failed right then. – Jatin May 15 '15 at 10:45
  • Actually, I am even alright with this fact. but not thread stealing. – Jatin May 15 '15 at 11:15
6

To add upto the accepted answer by @Misha, it can be further expanded as a collector:

 public static <T> Collector<CompletableFuture<T>, ?, CompletableFuture<List<T>>> sequenceCollector() {
    return Collectors.collectingAndThen(Collectors.toList(), com -> sequence(com));
}

Now you can:

Stream<CompletableFuture<Integer>> stream = Stream.of(
    CompletableFuture.completedFuture(1),
    CompletableFuture.completedFuture(2),
    CompletableFuture.completedFuture(3)
);
CompletableFuture<List<Integer>> ans = stream.collect(sequenceCollector());
Jatin
  • 31,116
  • 15
  • 98
  • 163
5

An example sequence operation using thenCombine on CompletableFuture

public<T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> com){

    CompletableFuture<List<T>> identity = CompletableFuture.completedFuture(new ArrayList<T>());

    BiFunction<CompletableFuture<List<T>>,CompletableFuture<T>,CompletableFuture<List<T>>> combineToList = 
            (acc,next) -> acc.thenCombine(next,(a,b) -> { a.add(b); return a;});

    BinaryOperator<CompletableFuture<List<T>>> combineLists = (a,b)-> a.thenCombine(b,(l1,l2)-> { l1.addAll(l2); return l1;}) ;  

    return com.stream()
              .reduce(identity,
                      combineToList,
                      combineLists);  

   }
} 

If you don't mind using 3rd party libraries cyclops-react (I am the author) has a set of utility methods for CompletableFutures (and Optionals, Streams etc)

  List<CompletableFuture<String>> listOfFutures;

  CompletableFuture<ListX<String>> sequence =CompletableFutures.sequence(listOfFutures);
John McClean
  • 5,225
  • 1
  • 22
  • 30
1

Disclaimer: This will not completely answer the initial question. It will lack the "fail all if one fails" part. However, I can't answer the actual, more generic question, because it was closed as a duplicate of this one: Java 8 CompletableFuture.allOf(...) with Collection or List. So I will answer here:

How to convert List<CompletableFuture<V>> to CompletableFuture<List<V>> using Java 8's stream API?

Summary: Use the following:

private <V> CompletableFuture<List<V>> sequence(List<CompletableFuture<V>> listOfFutures) {
    CompletableFuture<List<V>> identity = CompletableFuture.completedFuture(new ArrayList<>());

    BiFunction<CompletableFuture<List<V>>, CompletableFuture<V>, CompletableFuture<List<V>>> accumulator = (futureList, futureValue) ->
        futureValue.thenCombine(futureList, (value, list) -> {
                List<V> newList = new ArrayList<>(list.size() + 1);
                newList.addAll(list);
                newList.add(value);
                return newList;
            });

    BinaryOperator<CompletableFuture<List<V>>> combiner = (futureList1, futureList2) -> futureList1.thenCombine(futureList2, (list1, list2) -> {
        List<V> newList = new ArrayList<>(list1.size() + list2.size());
        newList.addAll(list1);
        newList.addAll(list2);
        return newList;
    });

    return listOfFutures.stream().reduce(identity, accumulator, combiner);
}

Example usage:

List<CompletableFuture<String>> listOfFutures = IntStream.range(0, numThreads)
    .mapToObj(i -> loadData(i, executor)).collect(toList());

CompletableFuture<List<String>> futureList = sequence(listOfFutures);

Complete Example:

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.BiFunction;
import java.util.function.BinaryOperator;
import java.util.stream.IntStream;

import static java.util.stream.Collectors.toList;

public class ListOfFuturesToFutureOfList {

    public static void main(String[] args) {
        ListOfFuturesToFutureOfList test = new ListOfFuturesToFutureOfList();
        test.load(10);
    }

    public void load(int numThreads) {
        final ExecutorService executor = Executors.newFixedThreadPool(numThreads);

        List<CompletableFuture<String>> listOfFutures = IntStream.range(0, numThreads)
            .mapToObj(i -> loadData(i, executor)).collect(toList());

        CompletableFuture<List<String>> futureList = sequence(listOfFutures);

        System.out.println("Future complete before blocking? " + futureList.isDone());

        // this will block until all futures are completed
        List<String> data = futureList.join();
        System.out.println("Loaded data: " + data);

        System.out.println("Future complete after blocking? " + futureList.isDone());

        executor.shutdown();
    }

    public CompletableFuture<String> loadData(int dataPoint, Executor executor) {
        return CompletableFuture.supplyAsync(() -> {
            ThreadLocalRandom rnd = ThreadLocalRandom.current();

            System.out.println("Starting to load test data " + dataPoint);

            try {
                Thread.sleep(500 + rnd.nextInt(1500));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            System.out.println("Successfully loaded test data " + dataPoint);

            return "data " + dataPoint;
        }, executor);
    }

    private <V> CompletableFuture<List<V>> sequence(List<CompletableFuture<V>> listOfFutures) {
        CompletableFuture<List<V>> identity = CompletableFuture.completedFuture(new ArrayList<>());

        BiFunction<CompletableFuture<List<V>>, CompletableFuture<V>, CompletableFuture<List<V>>> accumulator = (futureList, futureValue) ->
            futureValue.thenCombine(futureList, (value, list) -> {
                    List<V> newList = new ArrayList<>(list.size() + 1);
                    newList.addAll(list);
                    newList.add(value);
                    return newList;
                });

        BinaryOperator<CompletableFuture<List<V>>> combiner = (futureList1, futureList2) -> futureList1.thenCombine(futureList2, (list1, list2) -> {
            List<V> newList = new ArrayList<>(list1.size() + list2.size());
            newList.addAll(list1);
            newList.addAll(list2);
            return newList;
        });

        return listOfFutures.stream().reduce(identity, accumulator, combiner);
    }

}
Kai Stapel
  • 91
  • 1
  • 3
  • You should use `thenCombine()` instead of `thenApply()` in the accumulator, to avoid the `join()` call. Otherwise the calling thread will actually execute that, so the collection will only return after everything has completed. You can check this by adding a print before the `futureList.join()`: it only gets printed after all futures have printed “_Succesfully loaded test data_”. – Didier L May 18 '18 at 11:52
  • @DidierL If I change `thenApply()` to `thenCombine()` then the final `join()` call to the `CompletableFuture>` will not block anymore but return immediately with an empty result. So the future of list will not wait until all individual futures are complete. But that was the initial idea of the whole thing. – Kai Stapel May 18 '18 at 14:21
  • Yes, indeed, I forgot that a `Collector` relies on mutation. The problem with your code is that it is equivalent to `CompletableFuture.completedFuture(listOfFutures.stream().map(CompletableFuture::join).collect(toList()));`. The collection is actually returning a future that is already completed, so there is no point in returning a future any more. – Didier L May 18 '18 at 14:40
  • You may be correct that this is functionally equivalent to my "complete example". However, the example is just for illustrating purposes on how to use the `toFutureList()` collector. What is not equivalent is `listOfFutures.stream().map(CompletableFuture::join).collect(toList())` and `listOfFutures.stream().collect(toFutureList())`. The former gives you a complete result with all futures completed, while the latter gives you a future of a list of values that you can pass on, or map to other values without blocking. – Kai Stapel May 18 '18 at 16:25
  • That's where you are wrong: the latter does exactly the same. Your collector simply calls `join()` on all futures on the calling thread, and wraps the result in an already completed `CompletableFuture`. **It is blocking.** As I said previously, just add a print right after the stream collection and you will see that this print will only occur after all futures are completed. – Didier L May 18 '18 at 17:07
  • Yes, you are right. I updated my answer accordingly. Thanks for pointing that out. – Kai Stapel May 23 '18 at 12:06
1

Your task could be done easily like following,

final List<CompletableFuture<Module> futures =...
CompletableFuture.allOf(futures.stream().toArray(CompletableFuture[]::new)).join();
Janitha Madushan
  • 1,453
  • 3
  • 28
  • 40
0

In addition to Spotify Futures library you might try my code locate here: https://github.com/vsilaev/java-async-await/blob/master/net.tascalate.async.examples/src/main/java/net/tascalate/concurrent/CompletionStages.java (has a dependencies to other classes in same package)

It implements a logic to return "at least N out of M" CompletionStage-s with a policy how much errors it's allowed to tolerate. There are convinient methods for all/any cases, plus cancellation policy for the remaining futures, plus the code deals with CompletionStage-s (interface) rather than CompletableFuture (concrete class).

Valery Silaev
  • 199
  • 2
  • 4
0

Javaslang has a very convenient Future API. It also allows to make a future of collection out of a collection of futures.

List<Future<String>> listOfFutures = ... 
Future<Seq<String>> futureOfList = Future.sequence(listOfFutures);

See http://static.javadoc.io/io.javaslang/javaslang/2.0.5/javaslang/concurrent/Future.html#sequence-java.lang.Iterable-

Mathias Dpunkt
  • 11,594
  • 4
  • 45
  • 70