58

I'm developing a app with Spring Boot 2.0 and Kotlin using the WebFlux framework.

I want to check if a user id exits before save a transaction. I'm stucked in a simple thing like validate if a Mono is empty.

fun createTransaction(serverRequest: ServerRequest) : Mono<ServerResponse> {
    val transaction = serverRequest.body(BodyExtractors.toMono(Transaction::class.java))

    transaction.flatMap {
        val user = userRepository.findById(it.userId)
        // If it's empty, return badRequest() 
    } 

    return transaction.flatMap { transactionRepository.save(it).then(created(URI.create("/transaction/" + it.id)).build()) }
}

It is possible to do what I want?

Raedwald
  • 46,613
  • 43
  • 151
  • 237
voliveira89
  • 1,134
  • 2
  • 9
  • 22

5 Answers5

60

The techniques that allow checking whether Flux/Mono is empty

Using operators .switchIfEmpty/.defaultIfEmpty/Mono.repeatWhenEmpty

Using mentioned operators you will be able to react to the case when Stream has been completed without emitting any elements.

First of all, remember that operators such .map, .flatMap, .filter and many others will not be invoked at all if there no onNext has been invoked. That means that in your case next code

transaction.flatMap {
    val user = userRepository.findById(it.userId)
    // If it's empty, return badRequest() 
} 

return transaction.flatMap { transactionRepository.save(it).then(created(URI.create("/transaction/" + it.id)).build()) }

will not be invoked at all, if transaction will be empty.

In case if there is a requirement for handling cases when your flow is empty, you should consider operators like next in the following manner:

transaction
   .flatMap(it -> {
      val user = userRepository.findById(it.userId)
   })
   .swithIfEmpty(Flux.defer(() -> Flux.just(badRequest())));

Actual solution

Also, I have noted that you created two sub-flows from the main transaction. Actually, following code will not be executed at all:

transaction.flatMap {
    val user = userRepository.findById(it.userId)
    // If it's empty, return badRequest() 
}  

and will be only executed the last one, which is returned from the method. That happens because you ain't subscribed using operator .subscribe(...).

The second point, you can't subscribe to the same request body more the one time (kind of limitation for WebClient's reponse). Thus you are required to share your request body in the next way, so completed example will be:

fun createTransaction(serverRequest: ServerRequest): Mono<ServerResponse> {
    val transaction = serverRequest.body(BodyExtractors.toMono(Transaction::class.java)).cache()

    transaction
            .flatMap { userRepository.findById(it.userId) }
            .flatMap { transaction.flatMap { transactionRepository.save(it) } }
            .flatMap { ServerResponse.created(URI.create("/transaction/" + it.id)).build() }
            .switchIfEmpty(transaction.flatMap { ServerResponse.badRequest().syncBody("missed User for transaction " + it.id) })
}

Or more simple case without sharing transaction flow but using Tuple:

fun createTransaction(serverRequest: ServerRequest): Mono<ServerResponse> {
    val emptyUser = !User()
    val transaction = serverRequest.body<Mono<Transaction>>(BodyExtractors.toMono(Transaction::class.java))

    transaction
            .flatMap { t ->
                userRepository.findById(t.userId)
                        .map { Tuples.of(t, it) }
                        .defaultIfEmpty(Tuples.of(t, emptyUser))
            }
            .flatMap {
                if (it.t2 != emptyUser) {
                    transactionRepository.save(it.t1)
                            .flatMap { ServerResponse.created(URI.create("/transaction/" + it.id)).build() }
                } else {
                    ServerResponse.badRequest().syncBody("missed User for transaction " + it.t1.id)
                }
            }
}
belwood
  • 3,320
  • 11
  • 38
  • 45
Oleh Dokuka
  • 11,613
  • 5
  • 40
  • 65
  • The first solution doesn't compile... Could you check it? Also, there is no badRequest method with a string parameter. – voliveira89 Jan 27 '18 at 23:25
  • @voliveira, Could you point me to which language this example has been written since I just tried to follow your code convention – Oleh Dokuka Jan 28 '18 at 07:35
  • @voliveira89 fixed – Oleh Dokuka Jan 28 '18 at 22:01
  • Doesn't compile also! The example you gave (that is totally equals to what I wrote in my code) makes sense, but intellij raise a error in the bracket that closes the function: "A 'return' expression required in a function with a block body ('{...}')" – voliveira89 Jan 29 '18 at 21:57
  • could you please point me to the particular line in the code snippet above? – Oleh Dokuka Jan 30 '18 at 08:01
  • The last line of the first solution (the one that you edited). – voliveira89 Jan 30 '18 at 08:40
  • I have a issue where I need to return an empty Mono according to a condition. Further code flow is dependent on this empty Mono, How do I make sure if it returning an empty Mono? – Tarnished-Coder May 12 '21 at 15:53
  • @shapan-dashore you may use `log` operator after your flatMap to ensure that it emits only `onComplete` signal. Otherwise, if you need some fallback logic - you may put `defaultIfEmpty` operator which can handle the empty scenario and do something about it as a fallback – Oleh Dokuka May 12 '21 at 15:55
  • `Mono.fromFuture(() -> client.getItem(request)) .map(GetItemResponse::item) .filter(item -> !item.isEmpty()) .map(item -> ItemData.builder() .data(item.get("value").s()) .etag(item.get("etag").s()) .build())` I will try the log operator but from a cursory look at the above code what would it return if `item` object is empty, I want this to return an empty Mono. – Tarnished-Coder May 12 '21 at 16:58
7

You can check it using the Mono's provided method hasElement() which is analogous to Optional's isPresent(). The method definition is :

Mono<Boolean> hasElement()

for more details checkout : project reactor documentation

In case you have to perform some action based on this value you can further use switchIfEmpty() to provide with alternate publisher.

Poorvi
  • 101
  • 1
  • 3
3

Use Mono with Optional:

return findExistingUserMono
  .map(Optional::of)
  .defaultIfEmpty(Optional.empty())
  .flatMap(optionalUser -> {
    if(optionalUser.isPresent()) {
      return Mono.error('xxxx');
    }
    return this.userService.create(optionalUser.get());
  });

This way it will always emit Optional value so that the stream will never break.

PeiSong
  • 871
  • 1
  • 7
  • 12
2

Let me start by saying I am a newbie on reactive (java) and on this forum. I think you cannot really check in this code if a mono is empty because a mono represents code that will be executed later on, so in this code body you won't know yet if its is empty. Does that make sense?

I just wrote something similar in Java which seems to work (but not 100% this is the best approach either):

    public Mono<ServerResponse> queryStore(ServerRequest request) { 

        Optional<String> postalCode = request.queryParam("postalCode");                            

        Mono<ServerResponse> badQuery = ServerResponse.badRequest().build();
        Mono<ServerResponse> notFound = ServerResponse.notFound().build();

        if (!postalCode.isPresent()) { return  badQuery; }

        Flux<Store> stores = this.repository
                .getNearByStores(postalCode.get(), 5);

        return ServerResponse.ok().contentType(APPLICATION_JSON)
                .body(stores, Store.class)
                .switchIfEmpty(notFound);
}
Arno
  • 21
  • 4
0

We can use switchIfEmpty method for this

Below example, I'm checking if the user exists with email if not then add it

userRepository.findByEmail(user.getEmail())
                .switchIfEmpty(s -> {
                    user.setStatus("InActive");
                    String encodedPassword = DigestUtils.sha256Hex(user.getPassword());
                    user.setPassword(encodedPassword);
                    userRepository.save(user).subscribe();
                    s.onComplete();
                }).then(Mono.just(user));
Anil
  • 19
  • 1
  • There is no method with the signature `switchIfEmpty(Consumer)`. There is only `switchIfEmpty(Mono extends T> alternate)`, see https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html#switchIfEmpty-reactor.core.publisher.Mono- – Honza Zidek Nov 04 '22 at 19:53