0

I have the following situation where I'm trying to see if there is a solution for:

  • Two Spring service calls must be made in parallel (one is an existing service call/logic and the second is the new addition).
  • The results should then be merged and returned by the RESTful API.

A happy path should be straightforward, however, when it comes to errors emitted by the services the following rule should adhere to:

  • The API fails only when both service calls fail -- this should be thrown from the main thread and not the @Async pool since they are independent threads and don't have access to each other's exception (at least that's my reasoning).

  • If only one of them fails, log the error through another service (asynchronously), and the API returns only the results from a service that was successful -- this can be done from the respective @Async threads.

    @Service
    public class Serv1 interface ServInf {
     @Async("customPool")
     public CompletableFuture<List<Obj>> getSomething(int id) {
       // The service ensures that the list is never null, but it can be empty
       return CompletableFuture.completedFuture(/* calling an external RESTful API */);
     }
    }
    
    @Service
    public class Serv2 interface ServInf {
     @Async("customPool")
     public CompletableFuture<List<Obj>> getSomething(int id) {
       // The service ensures that the list is never null, but it can be empty
       return CompletableFuture.completedFuture(/* calling another external RESTful API */);
         }
     }
    
    @RestController
    public class MyController {
    
     /** Typical service @Autowired's */
    
     @GetMapping(/* ... */)
     public WrapperObj getById(String id) {
    
         CompletableFuture<List<String>> service1Result =
                 service1.getSomething(id)
                         .thenApply(result -> {
                             if (result == null) { return null; }
                             return result.stream().map(Obj::getName).collect(Collectors.toList());
                         })
                         .handle((result, exception) -> {
                             if (exception != null) {
                                 // Call another asynchronous logging service which should be easy
                                 return null;
                             } else {
                                 return result;
                             }
                         });
    
         CompletableFuture<List<String>> service2Result =
                 service2.getSomething(id)
                         .thenApply(result -> {
                             if (result == null) { return null; }
                             return result.stream().map(Obj::getName).collect(Collectors.toList());
                         })
                         .handle((result, exception) -> {
                             if (exception != null) {
                                 // Call another asynchronous logging service which should be easy
                                 return null;
                             } else {
                                 return result;
                             }
                         });
    
         // Blocking till we get the results from both services
         List<String> result1 = service1Result.get();
         List<String> result2 = service2Result.get();
    
         /** Where to get the exceptions thrown by the services if both fail
         if (result1 == null && result2 == null) {
             /** Signal that the API needs to fail as a whole */
             throw new CustomException( /** where to get the messages? */);
         }
    
         /** merge and return the result */
     }
    }
    

My question is, Since these services return a list of some object, even if I use CompletableFuture.handle() and check for existence of an exception, I can't return the Exception itself in order to capture and let Spring Advice class handle it (chained to return a list).

One thing I thought of is to use AtomicReference in order to capture the exceptions and set them within the handle() and use them once the futures are done/complete, e.g.

AtomicReference<Throwable> ce1 = new AtomicReference<>();
AtomicReference<Throwable> ce2 = new AtomicReference<>();

.handle((result, exception) -> {
    if (exception != null) {
        ce1.set(exception);
        return null; // This signals that there was a failure
    } else {
        return result;
    }
});

List<String> result1 = service1Result.get();
List<String> result2 = service2Result.get();

/** Where to get the exceptions thrown by the services if both fail
if (result1 == null && result2 == null) {
    /** Signal that the API needs to fail as a whole */
    throw new CustomException(/** do logic to capture ce1.get().getMessage() + ce2.get().getMessage() */);
}

First, does this sound like a viable solution in this multi-threaded asynchronous calls?

Second, this looks messy, so I was wondering if there is a more elegant way of capturing these exceptions outside of Spring async pool, and deal with it in the main thread, e.g. combine the exception information and throw it to Spring Advice exception handler.

Malvon
  • 1,591
  • 3
  • 19
  • 42
  • Since you're in the Spring ecosystem, have you looked into Reactor/webflux? – Thomas Timbul Feb 22 '22 at 15:25
  • 2
    `.get()` will throw the exceptions if any, so you could just use a good old try/catch around your `.get()`s and handle those exceptions synchronously. – sp00m Feb 22 '22 at 15:31
  • @ThomasTimbul, Two things: 1) The old service call must stay with `RestTemplate` since the external service call retires in the next year and half (we don't touch it), 2) The second service call to an external API will be developed in Reactor `WebClient` the make the call, however, there are some logic that needs to be performed after receiving the results from the `WebClient` -- that's why I'm thinking I have to deal with those logic in a separate `@Async` thread for the newer service (please advise if this is incorrect). – Malvon Feb 22 '22 at 15:39
  • @sp00m Would that also capture exceptions thrown by `ServInf.doSomething()`? It seems like [`get()`](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html#get--) only throws a few exceptions. – Malvon Feb 22 '22 at 15:42
  • @Malvon Any exception thrown while completing the future (i.e. asynchronously, resolving into a failed future) will be wrapped into an ExceptionException, accessible in the catch block with e.getCause() if needed. – sp00m Feb 22 '22 at 15:50
  • @sp00m This indeed provides a workaround by shifting the work from chasing the root cause to the main/joining thread. I think it actually provides more control. – Malvon Feb 22 '22 at 16:36
  • 1
    Some more clarifications please: What is the definition of `WrapperObj`? If the happy path involves only one of the results, why are you racing these services against each other? Would it not be preferable to either: load balance (smart?); always prefer one, only calling the other if failed (easiest to implement?); ....? Regarding my earlier comment, you can perform additional logic within additional Reactor transformations (in fact you should, to keep everything reactive and prevent the whole thing seizing up as you run out of threads). – Thomas Timbul Feb 23 '22 at 10:53
  • @ThomasTimbul I break this into two parts: (1) Requirement clarification, (2) Comment on Reactor. For (1), the goal is to make calls to two external services, retrieve information from both sources, and finally merge them. These are two distinct systems with their own response structures and logics that need to be ran against their respective responses. If one fails, we still want to return results from the other. Only if both fail, that's when the API fails altogether. `WrapperObj` is just wrapper for a merged lists coming from these services (RESTful JSON). – Malvon Feb 23 '22 at 13:40
  • @ThomasTimbul (2) The existing service is written in `RestTemplate` and won't get updated as it will retire in a year plus. And the only reason the new service call will use `WebClient` is because of deprecation of `RestTemplate`. The call is going to be blocking (I know, dumb), as the entire stack is blocking (Servlet, Spring MVC, external services are not `WebFlux` and block too). The logics are extensive, so not sure how well they fit into these transformations (sorry, don't know much about Reactive). – Malvon Feb 23 '22 at 13:45
  • @ThomasTimbul (2) Cont. As you can see, we are basically going from main Servlet thread to Spring common thread, to Reactive CPU per-core thread. – Malvon Feb 23 '22 at 13:46

3 Answers3

3

Assuming two futures

CompletableFuture<List<String>> service1Result = …
CompletableFuture<List<String>> service2Result = …

a straight-forward approach to combine the two futures is

CompletableFuture<List<String>> both = service1Result.thenCombine(service2Result,
    (list1, list2) -> Stream.concat(list1.stream(), list2.stream())
                            .collect(Collectors.toList()));

but this future will fail if either future fails.

To fail only when both futures failed and construct a new exception from both throwable, we can define two utility methods:

private static Throwable getThrowable(CompletableFuture<?> f) {
    return f.<Throwable>thenApply(value -> null)
            .exceptionally(throwable -> throwable).join();
}

private static <T> T throwCustom(Throwable t1, Throwable t2) {
    throw new CustomException(t1.getMessage() + " and " + t2.getMessage());
}

The method getThrowable is intended to be used with a future already known to be completed exceptionally. We could call join and catch the exception, but as show above, we can also turn transform the future to a non-exceptional future containing the throwable as its value.

Then, we can combine all of the above to

CompletableFuture<List<String>> failOnlyWhenBothFailed = both
    .thenApply(list -> both)
    .exceptionally(t ->
        !service1Result.isCompletedExceptionally()? service1Result:
        !service2Result.isCompletedExceptionally()? service2Result:
        throwCustom(getThrowable(service1Result), getThrowable(service2Result)))
    .thenCompose(Function.identity());

Within the function passed to exceptionally, the incoming futures are already known to be completed, so we can use the utility methods to extract the throwables and throw a new exception.

The advantage of this is that the resulting construction is non-blocking.

But in your case, you want to wait for the completion rather than returning a future, so we can simplify the operation:

CompletableFuture<List<String>> both = service1Result.thenCombine(service2Result,
    (list1, list2) -> Stream.concat(list1.stream(), list2.stream())
                            .collect(Collectors.toList()));

both.exceptionally(t -> null).join();

if(service1Result.isCompletedExceptionally()&&service2Result.isCompletedExceptionally()){
  Throwable t1 = getThrowable(service1Result), t2 = getThrowable(service2Result);
  throw new CustomException(t1.getMessage() + " and " + t2.getMessage());
}

List<String> result = (
    service1Result.isCompletedExceptionally()? service2Result:
    service2Result.isCompletedExceptionally()? service1Result: both
).join();

By using both.exceptionally(t -> null).join(); we wait for the completion of both jobs, without throwing an exception on failures. After this statement, we can safely use isCompletedExceptionally() to check the futures we know to be completed.

So if both failed, we extract the throwables and throw our custom exception, otherwise, we check which task(s) succeeded and extract the result of either or both.

Holger
  • 285,553
  • 42
  • 434
  • 765
  • I haven't noticed until this answer, but `exceptionally` does not allow to change the type of the initial `CompletableFuture`, that is why you resort to a very interesting: `f.thenApply(value -> null)`. Might not be obvious from this extensive answer, but I really liked it. – Eugene Feb 27 '22 at 20:06
  • @Holger Thanks. One question I had was, since these services are executed with spring `@Async` and return `CompletableFuture` with `CompletableFuture.completedFuture()`, what is the purpose of last `join()` on `List result`? P.S. With this method, I can use `isCompletedExceptionally()` to log when only an individual service fails and return the the result for the other successful service. Answer: Oops! To get the actual underlying result. Sorry missed that. – Malvon Feb 28 '22 at 17:27
1

CompletableFutures are quite cumbersome to deal with, but here would be a more functional and reactive approach IMO.

We'll need that sequence method from https://stackoverflow.com/a/30026710/1225328:

static<T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> com) {
    return CompletableFuture.allOf(com.toArray(new CompletableFuture<?>[0]))
            .thenApply(v -> com.stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList())
            );
}

Then, I'm using Optional to represent the status of the operations, but a Try monad would suit better (so use one if you have such a utility in your codebase - Java doesn't bring one natively yet):

CompletableFuture<Optional<List<Object>>> future1 = service1.getSomething().thenApply(Optional::of).exceptionally(e -> {
    // log e
    return Optional.empty();
});
CompletableFuture<Optional<List<Object>>> future2 = service2.getSomething().thenApply(Optional::of).exceptionally(e -> {
    // log e
    return Optional.empty();
});

Now wait for the two futures and handle the results once available:

CompletableFuture<List<Object>> mergedResults = sequence(Arrays.asList(future1, future2)).thenApply(results -> {
    Optional<List<Object>> result1 = results.get(0);
    Optional<List<Object>> result2 = results.get(1);
    if (result1.isEmpty() && result2.isEmpty()) {
        throw new CustomException(...);
    }
    // https://stackoverflow.com/a/18687790/1225328:
    return Stream.of(
            result1.map(Stream::of).orElseGet(Stream::empty),
            result2.map(Stream::of).orElseGet(Stream::empty)
    ).collect(Collectors.toList());
});

Then you would ideally return mergedResults directly and let the framework deal with it for you so that you don't block any thread, or you can .get() on it (which will block the thread), which will throw an ExecutionException if your CustomException (or any other exception) is thrown (accessible in e.getCause()).


This would look simpler with Project Reactor (or equivalent), in case you're using it already, but the idea would be roughly the same.

sp00m
  • 47,968
  • 31
  • 142
  • 252
  • Thanks. Why is the code returning `List` instead of `List`? I don't think even updating it to `List` would compile. And if I understood the last paragraph, does mergedResults have access to **both** underlying exceptions of `service1` and `service2` through its `ExecutionException`? Another pointer, and something I didn't mention in my OP as I thought I could work it through the solution, but for the time being, I need an ability to only call `service1` conditionally and muting `service2` till it's ready. I don't know if this solution would have that flexibility. – Malvon Feb 23 '22 at 00:08
  • 1
    It’s weird to call for a Try Monad when `CompletableFuture` is already that. The `sequence` method at the beginning of your answer is already sufficient. Then, the next code snippet unnecessarily adds `Optional` to the solution, so the third code snippet is about getting rid of the `Optional` again, to end where you already were at the first snippet. Changing an arbitrary exception to `CustomException` could be done by simply chaining `.exceptionally(t -> { throw new CustomException(); });` without altering the result when no exception occurred. – Holger Feb 24 '22 at 17:20
  • @Holger I'm not sure how you could implement the logic OP requested (_"fail only if both futures failed, merge the results otherwise"_) without an additional structure like Try (or Optional in my case), given we need both results before knowing whether the whole process should be marked as failed or not. Would you mind posting an answer with your solution? I've been trying such a simpler approach but couldn't find my way out. – sp00m Feb 24 '22 at 17:54
  • 1
    @sp00m I added an answer – Holger Feb 24 '22 at 20:04
1

Just because I raised it as a possibility, I imagine something like this should work in a world using Project Reactor:

First we modify the services to return Monos, which is easy using Mono.fromFuture (or you can turn one service into Reactor style, if and once it is ready):

@Service
public class Serv1 implements ServInf {
    public Mono<List<Obj>> getSomething(int id) {
        // The service ensures that the list is never null, but it can be empty
        return Mono.fromFuture(CompletableFuture.completedFuture(/* calling an external RESTful API */));
        //This Mono will either emit the result or complete with an error in case of Exception
    }
}

//similar for Serv2

The (reactive) endpoint could look like this (please see numbered comments below):

public Mono<WrapperObj> getById(String id) {
        WrapperObj wrapper = new WrapperObj(); //1
        Mono<Optional<List<Obj>>> s1Mono = serv1.getSomething(id)
            .subscribeOn(Schedulers.boundedElastic()) //2
            .map(Optional::ofNullable) //3
            .doOnError(wrapper::setS1ErrorResult) //4
            .onErrorResume(t -> Mono.just(Optional.empty())); //5

        Mono<Optional<List<Obj>>> s2Mono = serv2.getSomething(id)
            .subscribeOn(Schedulers.boundedElastic()) //2
            .map(Optional::ofNullable) //3
            .doOnError(wrapper::setS2ErrorResult) //4
            .onErrorResume(t -> Mono.just(Optional.empty())); //5

        return s1Mono
            .zipWith(s2Mono) //6
            .map(result ->
                //transforms non-error results and merges them into the wrapper object
                transformResult(result.getT1().orElse(null), result.getT2().orElse(null), wrapper) //7
            )
            .switchIfEmpty(Mono.just(wrapper)) //8

        ;
    }

Comments:

  1. The result is used to 'accumulate' the results and Exceptions

  2. Call services on the boundedElastic thread pool, which is recommended for longer IO tasks.

  3. Wrap the result in Optional. I am using an empty Optional as a convenience result for erroneous completion, since nulls don't propagate nicely through Reactor.

  4. If the service call throws an exception, we can set the corresponding erroneous result on the WrapperObj. This is similar to your use of AtomicReference, but without creating additional objects.

  5. However, such an exception would cause zipWith (6) to fail, so if this happens we substitute the Optional.empty() result.

  6. zipWith creates a tuple of both results

  7. We process these results, replacing

  8. What's left is to transform the two (non-exceptional) results:

    private WrapperObj transformResult(List<Obj> s1Result, List<Obj> s2Result, WrapperObj wrapper) {
        //perform your result transformation and
        //flesh out 'wrapper' with the results
        //if there was an exception, the 'wrapper' contains the corresponding exception values
        return wrapper;
    }
    
Thomas Timbul
  • 1,634
  • 6
  • 14