21

I have below code retuning Mono<Foo>:

try {
    return userRepository.findById(id)  // step 1
        .flatMap(user -> barRepository.findByUserId( user.getId())  // step 2
        .map(bar-> Foo.builder().msg("Already exists").build())  // step 3
            .switchIfEmpty(barRepository.save(Bar.builder().userId(user.getId()).build())  // step 4
                .map(bar-> Foo.builder().msg("Created").build())   // step 5 
            ))
            .doOnError(throwable -> Mono.just(handleError(throwable)));
    } catch(Exception e) {
        
        log.error("from catch block");
        return Mono.just(handleError(e));
        
    }

If an error occurs in step 1 (e.g. user does not exist by the specified id), will it be caught by doOnError or by try-catch block, or none of these two?

Same question if an error happens in step 2, step 3, step 4.

What is the correct code so that error is always caught by doOnError and eliminates try-catch?

I am using:

public interface UserRepository extends ReactiveMongoRepository<User, String>

same for barRepository.

handleError(throwable) simply does log.error(e.getMessage()) and retuns Foo.

Saikat
  • 14,222
  • 20
  • 104
  • 125
ace
  • 11,526
  • 39
  • 113
  • 193

6 Answers6

15

I think the first error is in the title: "Mono or Flux" is not related with the error handling.

  • Mono can only emit one item at the most (streams one element)
  • Flux can emit more complex stuff (i.e. List)

To handle errors you can follow this example:

return webClient.get()
                .uri(url)
                .retrieve()
                .bodyToMono(ModelYouAreRetrieving.class)
                .doOnError(throwable -> logger.error("Failed for some reason", throwable))
                .onErrorReturn(new ModelYouAreRetrieving(...))
                .block();
Andrea Ciccotta
  • 598
  • 6
  • 16
5

DoOnError will only perform side effects and assuming the findById are will return a Mono.Error() if it fails something like this should work.

return userRepository.findById(id)
    .flatMap ( user -> 
        barRepository.findByUserId(user.getId())
        .map((user,bar)-> Foo.builder().msg("Already exists").build())  
        .switchIfEmpty(barRepository.save(Bar.builder().userId(user.getId()).build())
        .map(bar-> Foo.builder().msg("Created").build())

    ))
    .onErrorReturn(throwable -> Mono.just(handleError(throwable)));

The try catch will only work if you either call a blocking operation of the chain, or a runtime error occurs before you enter the reactive chain. the doOn operations do not modify the chain, they are used for side effects only. Since flatMap expects a producer, you will need to return a Mono from the call, and in this case if an error occurs, then it will just propagate the error. In all reactive chains the error will propagate unless otherwise handled.

James Ralston
  • 1,170
  • 8
  • 11
  • this answer does not compile, I get error on .onErrorReturn (types mismatch). Also will .onErrorReturn handle error if userRepository.findById(id) fails i.e. specified user does not exist in database? – ace Jun 25 '18 at 16:25
  • I don't know the return types of the functions you have, or how the repo function returns when nothing is found. You need more details to get a compile ready solution. – James Ralston Jun 25 '18 at 18:42
  • reading the documentation of findById, it returns an `empty()` `Flux` if the entity is not found, that is to say one that only emit the `onComplete` signal. so no it won't pass through `onErrorReturn` – Simon Baslé Jun 26 '18 at 13:19
  • I can modify the example, it was not clear which repo library he would be using. `reactiveRepository` would have been more clear. – James Ralston Jun 26 '18 at 15:35
  • @Simon I am using public interface UserRepository extends ReactiveMongoRepository same for barRepository. would appreciate if u can give second answer – ace Jun 26 '18 at 18:02
4

Use Exceptions.propagate(e) which wraps a checked exception into a special runtime exception that can be handled by onError

Below Code tries to covers User attributes in upper case. Now, when it encounters kyle the checked exception is throws and MIKE is returned from onErrorReturn

@Test
void Test19() {
    Flux.fromIterable(Arrays.asList(new User("jhon", "10000"),
            new User("kyle", "bot")))
        .map(x -> {
            try {
                return toUpper(x);
            } catch (TestException e) {
                throw Exceptions.propagate(e);
            }
        })
        .onErrorReturn(new User("MIKE", "BOT")).subscribe(x -> System.out.println(x));
}

protected final class TestException extends Exception {
    private static final long serialVersionUID = -831485594512095557L;
}

private User toUpper(User user) throws TestException{
    if (user.getName().equals("kyle")) {
        throw new TestException();
    }
    return new User(user.getName().toUpperCase(), user.getProfession().toUpperCase());
}

Output

User [name=JHON, profession=10000]
User [name=MIKE, profession=BOT]
quintin
  • 812
  • 1
  • 10
  • 35
2

@Gianluca Pinto's last line of code is also incorrect. The code won't be compiled. onErrorReturn is not suitable for complicated error handling. What you should use is onErrorResume.

see: https://grokonez.com/reactive-programming/reactor/reactor-handle-error#21_By_falling_back_to_another_Flux

onErrorResume will fall back to another Flux and let you catch and manage the exception thrown by previous Flux. if look into the implementation of onErrorReturn, you will find onErrorReturn is actually using onErrorResume.

So here the code should be:

.onErrorResume(throwable -> Mono.just(handleError(throwable)));
Shuai Wang
  • 142
  • 1
  • 7
0

The last line of the code of @James Ralston is wrong. The correct code should be:

return userRepository.findById(id)
.flatMap ( user -> 
    barRepository.findByUserId(user.getId())
    .map((user,bar)-> Foo.builder().msg("Already exists").build())  
    .switchIfEmpty(barRepository.save(Bar.builder().userId(user.getId()).build())
    .map(bar-> Foo.builder().msg("Created").build())

))
.onErrorReturn(Mono.just(handleError(throwable)));
Gianluca Pinto
  • 235
  • 3
  • 6
0

While creating the reactive flow, we need to use onError* as it provides a fallback Mono/Flux while doOn* are side-effect operators.

NOTE: The examples are in Kotlin

Below is an example:

fun saveItems(item: Item) = testRepository.save(item)
        .onErrorResume {
            Mono.error(
                onErrorResumeHandler(
                    it,
                    "APP-1002",
                    "Error occurred while saving the something :P, contact admin"
                )
            )
        }

fun onErrorResumeHandler(exception: Throwable, errorCode: String, errorMessage: String) =
    if (exception is TestRepositoryException) exception else
        TestServiceException(errorCode, errorMessage)

There should be a central exception handler, we can create by extending AbstractErrorWebExceptionHandler. The order is -2 to supersede the default.

Below is an example:

@Component
@Order(-2)
class BaseControllerAdvice(
    errorAttributes: ErrorAttributes,
    resources: WebProperties.Resources,
    applicationContext: ApplicationContext,
    serverCodecConfigurer: ServerCodecConfigurer
) : AbstractErrorWebExceptionHandler(errorAttributes, resources, applicationContext) {

    val log = logger()

    init {
        setMessageWriters(serverCodecConfigurer.writers)
    }

    override fun getRoutingFunction(errorAttributes: ErrorAttributes?) =
        router {
            RequestPredicates.all().invoke(this@BaseControllerAdvice::renderErrorResponse)
        }
    //RouterFunctions.route(RequestPredicates.all(),this::renderErrorResponse)

    fun renderErrorResponse(
        request: ServerRequest
    ): Mono<ServerResponse> {
        val errorPropertiesMap = getErrorAttributes(
            request,
            ErrorAttributeOptions.defaults()
        )
        val ex: ApplicationException = getError(request) as ApplicationException
        log.info("Error attributes:{}", request)
        return ServerResponse.status(HttpStatus.BAD_REQUEST)
            .contentType(MediaType.APPLICATION_JSON)
            .body(BodyInserters.fromValue(ErrorResponseVO(ex.errorCode, ex.errorMessage)))
    }

    data class ErrorResponseVO(val errorMessage: String, val errorCode: String)

}
Dharmvir Tiwari
  • 886
  • 3
  • 12
  • 25