3

We have an async method:

public CompletableFuture<OlderCat> asyncGetOlderCat(String catName)

Given a list of Cats:

List<Cat> cats;

We like to create a bulk operation that will result in a map between the cat name and its async result:

public CompletableFuture<Map<String, OlderCat>>

We also like that if an exception was thrown from the asyncGetOlderCat, the cat will not be added to the map.

We were following this post and also this one and we came up with this code:

List<Cat> cats = ...

Map<String, CompletableFuture<OlderCat>> completableFutures = cats
            .stream()
            .collect(Collectors.toMap(Cat::getName,
                    c -> asynceGetOlderCat(c.getName())
                         .exceptionally( ex -> /* null?? */  ))
            ));


CompletableFuture<Void> allFutures = CompletableFuture
            .allOf(completableFutures.values().toArray(new CompletableFuture[completableFutures.size()]));

return allFutures.thenApply(future -> completableFutures.keySet().stream()
            .map(CompletableFuture::join) ???
            .collect(Collectors.toMap(????)));

But it is not clear how in the allFutureswe can get access to the cat name and how to match between the OlderCat & the catName.

Can it be achieved?

riorio
  • 6,500
  • 7
  • 47
  • 100
  • Why do you need `CompletableFuture allFutures`? – ernest_k Jun 06 '19 at 10:33
  • ...and what is the implementation of `asyncGetOlderCat`? – Naman Jun 06 '19 at 10:36
  • Something that I didn't understand from the question(correct me if I am wrong) is *map between the cat name and its async result:* to be `CompletableFuture>` and not `Map>` or `Map`. – Naman Jun 06 '19 at 10:48

2 Answers2

1

You are almost there. You don't need to put an exceptionally() on the initial futures, but you should use handle() instead of thenApply() after the allOf(), because if any future fails, the allOf() will fail as well.

When processing the futures, you can then just filter out the failing ones from the result, and rebuild the expected map:

Map<String, CompletableFuture<OlderCat>> completableFutures = cats
        .stream()
        .collect(toMap(Cat::getName, c -> asyncGetOlderCat(c.getName())));

CompletableFuture<Void> allFutures = CompletableFuture
        .allOf(completableFutures.values().toArray(new CompletableFuture[0]));

return allFutures.handle((dummy, ex) ->
        completableFutures.entrySet().stream()
                .filter(entry -> !entry.getValue().isCompletedExceptionally())
                .collect(toMap(Map.Entry::getKey, e -> e.getValue().join())));

Note that the calls to join() are guaranteed to be non-blocking since the thenApply() will only be executed after all futures are completed.

Didier L
  • 18,905
  • 10
  • 61
  • 103
  • Instead of introducing `null`s, just to filter them afterwards, you can omit the `.exceptionally(ex -> null)` and change the filter step to `.filter(entry -> !entry.getValue().isCompletedExceptionally())` – Holger Jun 06 '19 at 14:55
  • @Holger Thanks, this seems better indeed. You still need to handle the exceptional completion of `allFutures` in that case though. I updated my answer accordingly. – Didier L Jun 07 '19 at 08:03
  • Or you replace `.exceptionally(ex -> null) .thenApply(__ -> ` with `.handle((__,ex) -> `. – Holger Jun 07 '19 at 09:43
  • @Holger indeed, thanks. `handle()` didn't immediately come to my mind as we aren't really handling anything but indeed it works as well. – Didier L Jun 07 '19 at 11:54
0

As I get it, what you need is CompletableFuture with all results, the code below does exactly what you need

public CompletableFuture<Map<String, OlderCat>> getOlderCats(List<Cat> cats) {
    return CompletableFuture.supplyAsync(
            () -> {
                Map<String, CompletableFuture<OlderCat>> completableFutures = cats
                        .stream()
                        .collect(Collectors.toMap(Cat::getName,
                                c -> asyncGetOlderCat(c.getName())
                                        .exceptionally(ex -> {
                                            ex.printStackTrace();
                                            // if exception happens - return null
                                            // if you don't want null - save failed ones to separate list and process them separately
                                            return null;
                                        }))
                        );

                return completableFutures
                        .entrySet()
                        .stream()
                        .collect(Collectors.toMap(
                                Map.Entry::getKey,
                                e -> e.getValue().join()
                        ));
            }
    );
}

What it does here - returns future, which creates more completable future inside and waits at the end.

DDovzhenko
  • 1,295
  • 1
  • 15
  • 34