2

This is my stream:

Stream<CompletableFuture<String>> futureStream = IntStream
    .iterate(1, n -> n < resultSet.getTotalCount() / pageSize, n -> n++)
    .mapToObj(pageNumber -> this.buildCompletableFutureofResultSetType(oid, pageNumber, pageSize));

CompletableFuture has a allOf:

public static CompletableFuture<Object> allOf(CompletableFuture<?>... cfs);

I've took a look on allOf method code:

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
    return andTree(cfs, 0, cfs.length - 1);
}

I know I can first collect all of them in a List<CompletableFuture> and then pass it to allOf.

However, I'd like to chain this collector process into stream functional chain.

Any ideas?

Jordi
  • 20,868
  • 39
  • 149
  • 333
  • 1
    Only thing I can think of is: `Collectors.collectingAndThen(Collectors.toList(), list -> CompletableFuture.allOf(list.toArray(CompletableFuture[]::new)))`, but that still collects it into a list – Lino Jun 14 '21 at 13:30
  • 1
    You can implement your own `Collector` and pass it to the `.collect()` of your stream in order to retrieve directly a `CompletableFuture` corresponding to your `allOf()`. But I think it's a lot of work and I guess you want to chain it into the stream just for readibility, so I don't think it's worth the effort. – Matteo NNZ Jun 14 '21 at 13:32
  • Another option would be to use: `.reduce(CompletableFuture.completedFuture(null), CompletableFuture::allOf)`, but I don't exactly know how that would behave (would need some testing) – Lino Jun 14 '21 at 13:55

1 Answers1

2

However, I'd like to chain this collector process into stream functional chain.

No, you don't.

The andTree method tries to build a balanced tree. It is not exactly trivial to try to construct a balanced tree on your own, and may not even be possible (the fact that CompletableFuture can build balanced trees is an implementation detail, in the sense that the API doesn't expose any of this, and consequently you can't use it; these are all private/package-private methods).

Collectors, in the mean time, are quite complicated notions; they need to be able to on-the-fly balance and fill a potentially parallel stream of incoming data whose ordering and sorting properties are variable. Hence, writing your own collector is possible but it is very easy to shoot yourself in the foot when you do that.

So, if you want to 'directly' chain your stream of futures into a single future that completed when all elements in the stream complicates, it's extremely complicated and will result in an unbalanced tree. Which is highly likely to less efficient than introducing the overhead of going via an intermediate list.

How many futures are we talking about? If the answer is 'less than 10000', the cost of that intermediate list is effectively nil. If the answer is 'more than 10000', perhaps the costs of that list are problematic, but having an unbalanced tree for those futures is vastly more problematic. Either way, directly collecting the stream is the wrong answer.

Thus, what you want:

streamOfFutures.collect(Collectors.collectingAndThen(Collectors.toList(), list -> CompletableFuture.allOf(list.toArray(CompletableFuture[]::new))));

// or possibly, as it's much cleaner and shorter...
// Remember, functional style / streams are a tool, not some sort of magic
// incantation that guarantees elegant code!

CompletableFuture.allOf(streamOfFutures.toArray(CompletableFuture[]::new));
rzwitserloot
  • 85,357
  • 5
  • 51
  • 72
  • 1
    Note that the `list.toArray(CompletableFuture[]::new)` used in the first variant requires Java 11, for earlier versions you have to stay with `list.toArray(new CompletableFuture[0])`. • You can build a balanced tree with the `CompletableFuture` API, as [demonstrated here](https://stackoverflow.com/a/49939668/2711488), but it requires a random access data structure, in other words, is somewhere between very hard and impossible to implement as `Collector`. Ironically, parallel processing makes it easier as the implementation will try to balance for you. The problem is the sequential processing. – Holger Jun 15 '21 at 07:16