62

I have two methods.
Main method:

@PostMapping("/login")
public Mono<ResponseEntity<ApiResponseLogin>> loginUser(@RequestBody final LoginUser loginUser) {
    return socialService.verifyAccount(loginUser)
            .flatMap(socialAccountIsValid -> {
                if (socialAccountIsValid) {
                    return this.userService.getUserByEmail(loginUser.getEmail())
                            .switchIfEmpty(insertUser(loginUser))
                            .flatMap(foundUser -> updateUser(loginUser, foundUser))
                            .map(savedUser -> {
                                String jwts = jwt.createJwts(savedUser.get_id(), savedUser.getFirstName(), "user");
                                return new ResponseEntity<>(HttpStatus.OK);
                            });
                } else {
                    return Mono.just(new ResponseEntity<>(HttpStatus.UNAUTHORIZED));
                }
            });

}

And this invoked method (the service calls an external api):

public Mono<User> getUserByEmail(String email) {
    UriComponentsBuilder builder = UriComponentsBuilder
            .fromHttpUrl(USER_API_BASE_URI)
            .queryParam("email", email);
    return this.webClient.get()
            .uri(builder.toUriString())
            .exchange()
            .flatMap(resp -> {
                if (Integer.valueOf(404).equals(resp.statusCode().value())) {
                    return Mono.empty();
                } else {
                    return resp.bodyToMono(User.class);
                }
            });
} 

In the above example, switchIfEmpty() is always called from the main method, even when a result with Mono.empty() is returned.

I cannot find a solution for this simple problem.
The following also doesn't work:

Mono.just(null) 

Because the method will throw a NullPointerException.

What I also can't use is the flatMap method to check that foundUser is null.
Sadly, flatMap doesn't get called at all in case I return Mono.empty(), so I cannot add a condition here either.

@SimY4

   @PostMapping("/login")
    public Mono<ResponseEntity<ApiResponseLogin>> loginUser(@RequestBody final LoginUser loginUser) {
        userExists = false;
        return socialService.verifyAccount(loginUser)
                .flatMap(socialAccountIsValid -> {
                    if (socialAccountIsValid) {
                        return this.userService.getUserByEmail(loginUser.getEmail())
                                .flatMap(foundUser -> {
                                    return updateUser(loginUser, foundUser);
                                })
                                .switchIfEmpty(Mono.defer(() -> insertUser(loginUser)))
                                .map(savedUser -> {
                                    String jwts = jwt.createJwts(savedUser.get_id(), savedUser.getFirstName(), "user");
                                    return new ResponseEntity<>(HttpStatus.OK);
                                });
                    } else {
                        return Mono.just(new ResponseEntity<>(HttpStatus.UNAUTHORIZED));
                    }
                });

    }
lkatiforis
  • 5,703
  • 2
  • 16
  • 35
html_programmer
  • 18,126
  • 18
  • 85
  • 158
  • 2
    I am not sure whether I get this sentence right. ```switchIfEmpty() is always called from the main method, even when a result with Mono.empty() is returned.```. It is meant to be called isnt it ? – Barath Jan 26 '19 at 05:39
  • 1
    Can you elaborate on your problem a bit more? ''switchIfEmpty() is always called from the main method, even when a result with Mono.empty() is returned''. This is the expected behaviour. – Prashant Pandey Jan 26 '19 at 08:21
  • @Barath What I want to achieve is that if the external service returns 404, I can return a Mono with value `null` from the service layer which can be handled by the main method. I guess I could throw an error too, but prefer not to. The 404 should be handled on the service layer, and when a user is not found, this is application logic which I feel I should handle with `if`, and not by exception handling. I'm going to review `switfhIfEmpty` in the docs. Still, a working suggestion? – html_programmer Jan 26 '19 at 15:06
  • 1
    @PrashantPandey Please see above comment. – html_programmer Jan 26 '19 at 15:06
  • @Trace, your code still works, if 404, you are returning ```Mono.empty() ``` which is going to call ```switchIfEmpty```. Anyways if you want to handle errors if that is what you are looking for then you can use ```onErrorResume()``` and handle appropriately or you can also use ```onErrorReturn()```. [guide](https://www.baeldung.com/spring-webflux-errors) – Barath Jan 26 '19 at 16:23
  • @guide The problem is that switchIfEmpty also gets called when the Mono is not empty, which I don't want. The condition would be -> insert if empty, else update. – html_programmer Jan 26 '19 at 16:33
  • definitely it cant be. Kindly enable the logs and share the logs output – Barath Jan 26 '19 at 16:46
  • @Trace This is how I would handle this: If the response contains a 404, throw a wrapped exception. Then, in the main method, fall back to an alternate Mono using onErrorResume(insertUser(loginUser)). If you don't want to throw an exception, then your logic should work just fine: return an empty observable and handle it using switchIfEmpty. – Prashant Pandey Jan 26 '19 at 18:36
  • @Trace The problem is that switchIfEmpty also gets called when the Mono is not empty: Are you sure? Can you debug a bit, since this should not happen. – Prashant Pandey Jan 26 '19 at 18:36
  • @PrashantPandey The `insertUser` method in `switchIfEmpty ` gets called before the flatMap method handling the 404 gets called. No idea why this is the case. – html_programmer Jan 27 '19 at 00:08
  • @Trace Prolly a silly idea but could you add a .publishOn(Schedulers.parallel()) after the flatmap in the getUserByEmail method? Asking this since I am doing something very similar right now and it is working for me. – Prashant Pandey Jan 28 '19 at 19:34
  • @Trace can you please provide insertUser(loginUser) method – Ricard Kollcaku Jan 30 '19 at 10:05

2 Answers2

97

It's because switchIfEmpty accepts Mono "by value". Meaning that even before you subscribe to your mono, this alternative mono's evaluation is already triggered.

Imagine a method like this:

Mono<String> asyncAlternative() {
    return Mono.fromFuture(CompletableFuture.supplyAsync(() -> {
        System.out.println("Hi there");
        return "Alternative";
    }));
}

If you define your code like this:

Mono<String> result = Mono.just("Some payload").switchIfEmpty(asyncAlternative());

It'll always trigger alternative no matter what during stream construction. To address this you can defer evaluation of a second mono by using Mono.defer

Mono<String> result = Mono.just("Some payload")
        .switchIfEmpty(Mono.defer(() -> asyncAlternative()));

This way it will only print "Hi there" when alternative is requested

UPD:

Elaborating a little on my answer. The problem you're facing is not related to Reactor but to Java language itself and how it resolves method parameters. Let's examine the code from the first example I provided.

Mono<String> result = Mono.just("Some payload").switchIfEmpty(asyncAlternative());

We can rewrite this into:

Mono<String> firstMono = Mono.just("Some payload");
Mono<String> alternativeMono = asyncAlternative();
Mono<String> result = firstMono.switchIfEmpty(alternativeMono);

These two code snippets are semantically equivalent. We can continue unwrapping them to see where the problem lies:

Mono<String> firstMono = Mono.just("Some payload");
CompletableFuture<String> alternativePromise = CompletableFuture.supplyAsync(() -> {
        System.out.println("Hi there");
        return "Alternative";
    }); // future computation already tiggered
Mono<String> alternativeMono = Mono.fromFuture(alternativePromise);
Mono<String> result = firstMono.switchIfEmpty(alternativeMono);

As you can see future computation was already triggered at the point when we start composing our Mono types. To prevent unwanted computations we can wrap our future into a defered evaluation:

Mono<String> result = Mono.just("Some payload")
        .switchIfEmpty(Mono.defer(() -> asyncAlternative()));

Which will unwrap into

Mono<String> firstMono = Mono.just("Some payload");
Mono<String> alternativeMono = Mono.defer(() -> Mono.fromFuture(CompletableFuture.supplyAsync(() -> {
        System.out.println("Hi there");
        return "Alternative";
    }))); // future computation defered
Mono<String> result = firstMono.switchIfEmpty(alternativeMono);

In second example the future is trapped in a lazy supplier and is scheduled for execution only when it will be requested.

UPD: 2022:

Since some time project reactor comes with an alternative API for wrapping eagerly computed futures which results in the same - trapping eager computation in a lazy supplier:

Mono<String> result = Mono.just("Some payload")
        .switchIfEmpty(Mono.fromCompletionStage(() -> alternativePromise()));
Alex
  • 7,460
  • 2
  • 40
  • 51
  • I tried this, but the method as you define `asyncAlternative()`, always gets triggered despite the `Mono.defer()`. – html_programmer Jan 30 '19 at 19:25
  • 2
    @Trace can you show me what you've tried? Because defer's only purpose is to not allow evaluation happen before time. – Alex Jan 30 '19 at 19:34
  • Please see updated post. In debug mode I've seen that the `Mono.defer` is called after, but that doesn't take away that it always gets executed, even when `this.userService.getUserByEmail(loginUser.getEmail())` does not return `Mono.empty()`. – html_programmer Jan 30 '19 at 19:44
  • 4
    `It'll always trigger alternative no matter what during stream construction.` Then what's the use. It's a method named `switchIfEmpty`. Some things really don't make sense. – html_programmer Jan 30 '19 at 19:54
  • Thanks for the explanation. But as you see in my code example, I use your deferred method, but the callback still always gets executed despite using `defer`. The `getUserByEmail` returns a Mono, so do the methods `insertUser` and `updateUser`. I really don't understand why. – html_programmer Feb 01 '19 at 00:22
  • 2
    Your answer was correct. I broke it down, turned out that the reason why `switchIfEmpty` got triggered was because in fact `updateUser` returned an empty body with http status code `204`! I modified the api a bit reluctantly, but now it works correctly. Thanks for this! – html_programmer Feb 01 '19 at 00:35
  • Do we really need `Mono.defer(() -> Mono.fromFuture(CompletableFuture.supplyAsync(() -> { System.out.println("Hi there"); return "Alternative"; }))); ` as whole or just `Mono.defer(() -> "Alternative");` is enough? – Akshay Jan 22 '20 at 19:13
  • @Akshay the rule of thumb is: if the alternative branch has side effects - then defer it. Mono.defer(() -> ”Alternative”) has no side effects so deferring is not necessary. – Alex Jan 22 '20 at 19:35
  • I understood defer, my question is do we need `Mono.fromFutuer(CompletableFuter.supplyAsync(())...` with defer? or just `Mono.defer()` is enough? (Let's assume branch has side effect.) – Akshay Jan 23 '20 at 06:30
  • You don't need a future to make reactive code concurrent. In fact, I’d recommend avoiding that because of the issues described above. But sometimes it's a necessity. – Alex Jan 23 '20 at 06:37
40

For those who, despite the well voted answer, do not still understand why such a behaviour:

Reactor sources (Mono.xxx & Flux.xxx) are either:

  • Lazily evaluated : the content of the source is evaluated/triggered only when a subscriber subscribes to it;

  • or eagerly evaluated : the content of the source is immediately evaluated even before the subscriber subscribes.

Expressions like Mono.just(xxx), Flux.just(xxx), Flux.fromIterable(x,y,z) are eager.

By using defer(), you force the source to be lazily evaluated. That's why the accepted answer works.

So doing this:

 someMethodReturningAMono()
  .switchIfEmpty(buildError());

with buildError() relying on an eager source to create an alternative Mono will ALWAYS be evaluated before the subscription:

Mono<String> buildError(){
       return Mono.just("An error occured!"); //<-- evaluated as soon as read
}

To prevent that, do this:

 someMethodReturningAMono()
  .switchIfEmpty(Mono.defer(() -> buildError()));

Read this answer for more.

Philippe Simo
  • 1,353
  • 1
  • 15
  • 28