1

I would like to know which is the official mechanism to do zip using CompletableFuture. So far I just use thenCombine operator. Here my example.

@Test
public void zip() throws InterruptedException {
    CompletableFuture<Either<Integer, String>> completableFuture = CompletableFuture.supplyAsync(this::getValue);
    CompletableFuture<Either<Integer, String>> completableFuture1 = CompletableFuture.supplyAsync(this::getValue);
    CompletableFuture<Either<Integer, String>> completableFuture2 = CompletableFuture.supplyAsync(this::getValue);

    completableFuture
            .thenCombine(completableFuture1, (c1, c2) -> new Right<>(c1.right().get() + "|" + c2.right().get()))
            .thenCombine(completableFuture2, (c1, c2) -> new Right<>(c1.right().get() + "|" + c2.right().get()))

            .whenComplete((result, throwable) -> System.out.println(result.right().get()));
    Thread.sleep(2000);
}

For me the use of thenCombine operator is more like merge operator of RxJava.

Any idea if there´s a better way to do it?.

I just want run three process in parallel and the zip the results.

Regards.

paul
  • 12,873
  • 23
  • 91
  • 153
  • consider a stream of completable futures and reducing that stream? – the8472 Oct 17 '17 at 19:30
  • 1
    Sorry I don't exactly get it, can you provide an example using Stream with the completable future?. I have an API that return CompletableFuture, so I need to zip all them. Still I understand then that completableFuture does not have a operator to make zip of N process? – paul Oct 17 '17 at 19:34
  • if you need pairwise N:1 reduction then CompletableFuture does not provide that since it only deals with one or at most two elements (i.e. the pairwise part, not the N part). The whole "operate on N elements" thing is orthogonal to asynchronous execution and provided through collections or streams. Basically you're expecting CFs to provide two features where it is only a building block and the other feature is provided by another one. – the8472 Oct 17 '17 at 20:36
  • Since you tagged RxJava, why don't you use RxJava's `zip`? – akarnokd Oct 17 '17 at 22:07
  • Since the API that I'm consuming is returning CF. I can transform one to another but my question here was. Provide CF an operator to do Zip. And seems like it does not – paul Oct 17 '17 at 22:09
  • [Related question](https://stackoverflow.com/questions/30025428/listfuture-to-futurelist-sequence)? It sounds like `CompletableFuture.allOf` suits your needs. – concat Oct 17 '17 at 22:33

1 Answers1

2

There indeed does not seem to exist any direct equivalent of RxJava's zip (e.g. Single.zip) for CompletableFuture. CompletableFuture.allOf comes close, but does not give you any combined response value.

The documentation says:

Otherwise, the results, if any, of the given CompletableFutures are not reflected in the returned CompletableFuture, but may be obtained by inspecting them individually.

Using this, it would not be too hard to build some Single.zip-like wrappers on top of CompletableFuture.allOf. Here is one for the form that takes two CompletableFutures:

class Zip {
  static <T1, T2, R> CompletableFuture<R> zip(CompletableFuture<T1> t1, CompletableFuture<T2> t2, BiFunction<T1, T2, R> combine) {
    return CompletableFuture.allOf(t1, t2).thenApply(ignored -> combine.apply(get(t1), get(t2)));
  }

  private static <T> T get(CompletableFuture<T> completableFuture) {
    // This will only be used for CompletableFutures that we know have
    // completed with a result.
    try {
      return completableFuture.get();
    } catch (Exception e) {
      throw new RuntimeException(e);
    }
  }
}

I here used Java's built-in BiFunction interface for the combiner, for variants of larger arity we need to define our own interface:

interface Function3<T1, T2, T3, R> {
  R apply(T1 t1, T2 t2, T3 t3);
}


class Zip {
  // ...

  static <T1, T2, T3, R> CompletableFuture<R> zip(CompletableFuture<T1> t1, CompletableFuture<T2> t2, CompletableFuture<T3> t3, Function3<T1, T2, T3, R> combine) {
    return CompletableFuture.allOf(t1, t2, t3).thenApply(ignored -> combine.apply(get(t1), get(t2), get(t3)));
  }
}

(Note: I have briefly tested that these methods work, but they are not battle-tested in a production app.)

skagedal
  • 2,323
  • 23
  • 34