3

Please, consider an example from the "Modern Java in Action" book (2nd Edition, listing 16.16, page 405). There, we have three map operation to obtain the list of discounted prices for the product from all the shops in a stream. First, we contact each shop to get a response containing non-discounted price along with a discount type, than parse the response into Quote object, and pass it over to the remote Discount service which returns a string with already discounted price.

public List<String> findPrices(String product) {

    List<CompletableFuture<String>> priceFutures =
        shops.stream()
            .map(shop -> CompletableFuture.supplyAsync(
                    () -> shop.getPrice(product), executor))
            .map(future -> future.thenApply(Quote::parse))
            .map(future -> future.thenCompose(quote ->
                    CompletableFuture.supplyAsync(
                        () -> Discount.applyDiscount(quote), executor)))
            .collect(toList());

    return priceFutures.stream()
            .map(CompletableFuture::join)
            .collect(toList());
}

My question is not about the difference between thenApply and thenCompose. I believe, the latter is used to avoid nested construction like CompletableFuture<CompletableFuture<...>>. What I don't understand, though, why we need to create another level of CompletableFuture here at all? It seems like the author added some artificial complexity to the code by creating and then flattening nested CompletableFuture, instead of simply using thenApplyAsync in the third map like this:

            .map(shop -> CompletableFuture.supplyAsync(
                    () -> shop.getPrice(product), executor))
            .map(future -> future.thenApply(Quote::parse))
            .map(future -> future.thenApplyAsync(Discount::applyDiscount, executor))

Are those two mapping usages (the original with thenCompose and the one with thenApplyAsync) equivalent? Both accept the result of the previous mapping as an argument, both provide custom executor to perform the task, and both return the same CompletableFuture<String> result.

escudero380
  • 536
  • 2
  • 7
  • 25
  • 2
    See [this Q&A](https://stackoverflow.com/q/50854568/2711488), which seems to discuss another aspect of the same chapter (can’t verify whether “listing 16.16, page 405” is the same as 11.4.3). The answer addresses the point that `thenCompose(quote -> CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote), executor))` is indeed a verbose version of `thenApplyAsync`, and the splitting into multiple `Stream.map` operations doesn’t add to the readability as well. – Holger May 18 '20 at 14:06
  • 2
    You should also avoid calling `CompletableFuture.join` **unless you're certain it will not block**, as it may result in deadlocks (for instance, if `findPrices` is called asynchronously as well and it uses an executor with a bounded number of threads). – kewne May 18 '20 at 14:21

1 Answers1

2

Yes, the thenCompose and supplyAsync achieve the same as using thenApplyAsync directly.

I haven't read the book, but it might be that some example code is focused on some topic or feature rather than the most concise or fastest code. As such, I leave some suggestions assuming you're considering using similar code.


One more suggestion about this code is that it's kind of weird to chain each CompletableFuture through successive calls to map. It seems the current example was built on top of a previous Stream based method with multiple calls, and left as-is but with the use of CompletableFuture.

I prefer one single map and chaining each CompletableFuture directly, which also allows refactoring it out into a method of its own.

So this:

            .map(shop -> CompletableFuture.supplyAsync(
                    () -> shop.getPrice(product), executor))
            .map(future -> future.thenApply(Quote::parse))
            .map(future -> future.thenApplyAsync(Discount::applyDiscount, executor))

Would become this:

            .map(shop ->
                CompletableFuture.supplyAsync(() -> shop.getPrice(product), executor)
                .thenApply(Quote::parse)
                .thenApplyAsync(Discount::applyDiscount, executor))

This lambda is easily turned into a method, which can be reused in cases without a Stream, it can be composed with another CompletableFuture, it can be tested, it can be mocked, etc.


Another suggestion would be to make your code asynchronous all the way, such that findPrices doesn't block on join (or get, for that matter).

The problem with blocking is that it might block the last available thread on an executor, thus provoking a deadlock by thread exhaustion. The asynchronous code on which your code depends on, that eventually needs to run on the executor, might never run.

public CompletableFuture<List<String>> findPricesAsync(String product) {
    // List<CompletableFuture<String>> priceFutures = ...

    CompletableFuture<Void> all = CompletableFuture.allOf(priceFutures.toArray(new CompletableFuture<String>[priceFutures.size()]));
    return all.thenRun(() -> priceFutures.stream()
        .map(CompletableFuture::join));
}

Note that the return type changed from List<String> to CompletableFuture<List<String>>. Also note that the last call to join will not block, as every CompletableFuture on which it will be called has completed.


Finally, I tend to return CompletionStage, as it allows hypothetical implementations other than CompletableFuture. I also make the assumption that the returned object also implements Future, which allows to use get on the result, but not join, the difference being the declared thrown exception types.

In one case where I made NIO-like methods return CompletionStage for asynchronous I/O, I've implemented a subclass of CompletableFuture overriding the default executor used in each *Async method that doesn't have the executor parameter. This was made easier since Java 9, still by subclassing, but it only needs overriding defaultExecutor. The main reason I subclassed was that the alternative using composition would result in much more code (wrapping results and what not). Another reason, but not what really worried me, was having an extra object to be garbage collected for each instance.

This was just to demonstrate that there may be cases where having custom CompletionStage implementations is actually needed, which might or might not be subclasses of CompletableFuture.

acelent
  • 7,965
  • 21
  • 39