1

I want to make parallel calls to different external services and proceed with the first successful response.

Successful here means that the result returned by the service has certain values in certain fields.

However, for CompletableFuture, everything other than an exception is success. So, even for for business failures, I have to throw an exception to signal non-success to CompletableFuture. This feels wrong, ideally I would want to provide a boolean to indicate business success/failure. Is there a better way to signal business failures?

The second question I have is, how do I make sure I don't run out of threads due to the abandoned CompletableFutures that would keep running even after CompletableFutures.anyOf() returns. Ideally I want to force stop the threads but as per the thread below, the best I can do is cancel the downstream operations.

How to cancel Java 8 completable future?

When you call CompletableFuture#cancel, you only stop the downstream part of the chain. Upstream part, i. e. something that will eventually call complete(...) or completeExceptionally(...), doesn't get any signal that the result is no more needed.

I can force stop treads by providing my own ExecutorService and calling shutdown()/shutdownNow() on it after CompletableFuture.anyOf() returns. However I am not sure about the implications of creating a new ExecutorService instance for each request.

Dojo
  • 5,374
  • 4
  • 49
  • 79
  • 1
    Maybe you shouldn't tie your design so tightly into `CompatibleFutures`. It's an implementation detail after all, not an architecture. – Kayaman May 24 '22 at 06:00
  • 1
    You are overestimating the effect of an executor service. Calling `shutdown()` has no effect on the ongoing evaluations at all and even `shutdownNow()` only affects operations actively supporting interruption. What’s wrong with signaling a “business failure” with an exception? – Holger May 24 '22 at 08:49
  • One doesn't generally use exceptions to signal outcomes that are commonly expected. Exceptions should occur in exceptional cases. Otherwise they would be called the Norm. In absence of a better solution, I don't mind using it, just that, its not my first choice. – Dojo May 24 '22 at 09:23
  • 1
    I have some doubts about your business when “business failure” is the “commonly expected” outcome. However, since the exceptional completion has no special meaning to `anyOf`, you can model your “business failure” result however you like, value result or exceptional result, it doesn’t really matter. – Holger May 24 '22 at 11:13
  • Business Failure = when application logic yields false. I guess its more palatable now. – Dojo May 25 '22 at 03:33
  • 1
    As said, to `anyOf`, it doesn’t matter how you encode this result. It will use whatever evaluated faster, whether the result is exceptional or an ordinary `false`. So there is no need to replace your application logic result of `false` with an exception, as it won’t bring you closer to the desired outcome anyway. The other point you didn’t respond to, is the question whether the underlying operation would respond to interruption at all; if you spend efforts in implementing cancelation, there should be an effect on the operation, as otherwise, you just wasted your time. – Holger Jun 01 '22 at 08:02

1 Answers1

0

This is perhaps an attempt to look away from anyOf(), as there's just no easy way to cancel other tasks with it.

What this is doing is create and start async tasks, then keep a reference to the future along with an object that would be used to effectively terminate other tasks, which of course is dependent on your actual code/task. In this example, I'm just returning the input string (hence the type Pair<CompletableFuture<String>, String>); your code will probably have something like a request object.

ExecutorService exec = Executors.newFixedThreadPool(5);
List<String> source = List.of();

List<Pair<CompletableFuture<String>, String>> futures = source.stream()
        .map(s -> Pair.of(CompletableFuture.supplyAsync(() -> s.toUpperCase(), exec), s))
        .collect(Collectors.toList());

Consumer<Pair<CompletableFuture<String>, String>> cancelOtherTasksFunction = pair -> {
    futures.stream()
            .filter(future -> future != pair)
            .forEach(future -> {
                future.getLeft().cancel(true); // cancel the future
                // future.getRight().doSomething() // cancel actual task
            });
};

AtomicReference<String> result = new AtomicReference<>();
futures.forEach(future -> future.getLeft()
        .thenAccept(s -> {
            if (null == result.compareAndExchangeRelease(null, s)) {
                // the first success is recorded
                cancelOtherTasksFunction.accept(future);
            }
        }));

I suppose you can (should?) create a class to hold the future and the task object (such as an http request you could cancel) to replace the Pair.

Note:

  • if (null == result.compareAndExchangeRelease(null, s)) is only valid if your future returns a non-null value.
  • The first valid response will be in result, but you still need to test for blocking before returning it, although I suppose it should work as other tasks are being cancelled (that's the theory).
  • You may decide to make futures.forEach part of the stream pipeline above, just be careful to force all tasks to be submitted (which collect(Collectors.toList()) does).
ernest_k
  • 44,416
  • 5
  • 53
  • 99
  • 1
    Letting aside that `cancel(true)` on a `CompletableFuture` has no effect on the already ongoing computation (as already mentioned in the question), the code to cancel the other futures is unnecessary complicated. You don’t need to track which future has been completed, as an already completed future will ignore any subsequent calls to cancel anyway. So you can simply call cancel on all futures as soon as one completed and get the same effect. – Holger May 24 '22 at 08:58
  • Right, @Holger, canceling the futures can be simplified. The answer's idea around canceling the task is in the second value of the Pair object (which depends on actual code, I use string as placeholder). Maybe I didn't explain it clearly though :) – ernest_k May 24 '22 at 09:12