0

can you help me understand why you give me this warning?

Essentially this POST call gives me results (userPayload), that I reuse in a further POST call and save the data to db.

What's wrong?

public Mono<ResponseEntity> createUser(UserRequest requestPayload) {

   return webClientBuilder
            .build()
            .post()
            .uri(settings.getUrl())
            .accept(MediaType.APPLICATION_JSON)
            .contentType(MediaType.APPLICATION_JSON)
            .header("Authorization", settings.getApiToken())
            .body(BodyInserters.fromValue(requestPayload))
            .exchange()
            .flatMap(clientResponse -> {
                if (clientResponse.statusCode().isError()) {
                    return clientResponse.bodyToMono(Error.class)
                            .flatMap(error -> Mono.error(new CustomException(clientResponse.statusCode(), error)));
                } else {
                    return clientResponse.bodyToMono(UserPayload.class)
                            .flatMap(user -> {
                                saveNewUser(user);
                                validateUser(user.getLinks());
                                return Mono.just(new ResponseEntity<>(user, HttpStatus.OK));
                            }).switchIfEmpty(Mono.error(new NotFoundCustomException("User Payload not found!")));
                }
            }); 

}

Second method:

private Mono validateUser(String uri) {

    webClientBuilder
            .build()
            .post()
            .uri(uri)
            .accept(MediaType.APPLICATION_JSON)
            .contentType(MediaType.APPLICATION_JSON)
            .header("Authorization", settings.getApiToken())
            .retrieve()
            .onStatus(HttpStatus::isError, clientResponse -> clientResponse.bodyToMono(Error.class)
                    .flatMap(error -> Mono.error(new CustomException(clientResponse.statusCode(), error)))
            ).bodyToMono(Void.class);
}
  • 1
    I am somewhat new to Rx myself, however my understanding is to try not to block. In your case I think you have two option, the first is have the `WebClient` return the response body. The other is to replace the `block` in the first call with `map(e -> saveUser)` (plus a map for validating the user). for your error condition, try `switchOnEmpty` – Gavin Feb 11 '21 at 10:09
  • ok sounds good. I have successfully modified the first method. If I wanted to remove the block () also from the second method, what should I do? I tried but it doesn't call me, it gets to flatMap () and it seems to stop. – EngineerTop Feb 11 '21 at 11:08
  • I dont think you need the `flatMaps`. WebClient has a method where you can handle errors from the http socket: `.onStatus(HttpStatus::isError response -> ...`. you can use that to throw your custom exceptions, then you can do `bodyToMono`, then I dont know as I dont know your use case, perhaps `validateUser` should return `Mono`. The way I approach Webflux/Rx is to think of it as a Stream (akin to Collection Streams) – Gavin Feb 11 '21 at 11:16
  • ok I changed the two methods, the problem is always the same, the second method (validateUser) seems not to execute the call correctly. What could it be? – EngineerTop Feb 11 '21 at 11:28
  • you are breaking the chain and ignoring the return from `saveNewUser(user);` you need to `return` or `flatMap` or `then`, and call `validateUser`. – Toerktumlare Feb 11 '21 at 11:32
  • saveNewUser(user) is a void method. how can I do? – EngineerTop Feb 11 '21 at 13:35
  • then you need to update question with that code, we need to know what it does, is it blocking or not? – Toerktumlare Feb 11 '21 at 16:37
  • Given `repository.save(...)` returns a `User` synchronously, I suppose you're using a JDBC-based library (e.g. JPA), which doesn't play well with WebFlux since JDBC blocks on the IOs. Ideally, you would move to a non-blocking persistence layer (e.g. [R2DBC](https://spring.io/projects/spring-data-r2dbc)). A workaround is to block on another thread pool (which is better than blocking the request thread pool), yet it doesn't feel right, does it? Perhaps the Web MVC stack would better suit? More details [here](https://stackoverflow.com/a/65185737/1225328). – sp00m Feb 11 '21 at 17:44

1 Answers1

0

In the end, I solved it like this:

public Mono<ResponseEntity<UserPayload>> createUser(UserRequest requestPayload) {

        return webClientBuilder
                .build()
                .post()
                .uri(settings.getUrl())
                .accept(MediaType.APPLICATION_JSON)
                .contentType(MediaType.APPLICATION_JSON)
                .header("Authorization", settings.getApiToken())
                .body(BodyInserters.fromValue(requestPayload))
                .retrieve()
                .onStatus(HttpStatus::isError, errorResponse -> errorResponse
                        .bodyToMono(Error.class)
                        .flatMap(error -> Mono.error(new CustomException(errorResponse.statusCode(), error))))
                .bodyToMono(UserPayload.class)
                .flatMap(user -> {
                return Mono.fromSupplier(()->repository.save(convertFromPayloadUser(user)))
                        .subscribeOn(Schedulers.boundedElastic())
                        .then(validateUser(user.getLinks()))
                        .then(Mono.just(new ResponseEntity<>(user, HttpStatus.OK)));
            }).switchIfEmpty(Mono.error(new NotFoundCustomException("User Payload not found!")));
    }

second method:

private Mono<Void> validateUser(String uri) {
        return webClientBuilder
                .build()
                .post()
                .uri(uri)
                .accept(MediaType.APPLICATION_JSON)
                .contentType(MediaType.APPLICATION_JSON)
                .header("Authorization", settings.getApiToken())
                .retrieve()
                .onStatus(HttpStatus::isError, clientResponse -> clientResponse.bodyToMono(Error.class)
                        .flatMap(error -> Mono.error(new CustomException(clientResponse.statusCode(), error)))
                ).bodyToMono(Void.class);
    }
  • You're just tricking the inspector here, `repository.save(...)` will still be blocking the reactive stream, `Mono.just(...).subscribeOn(...)` serves no purpose when used as you did. You could do `return Mono.fromSupplier(() -> repository.save(...)).subscribeOn(Schedulers.boundedElastic())` instead, with another following `flatMap` for the `validateUser` part for example, to ensure the request thread pool isn't blocked. – sp00m Feb 11 '21 at 17:52
  • 1
    I fixed up like you told me, what do you think? – EngineerTop Feb 11 '21 at 18:34
  • Yes, I believe something along those lines should do :) – sp00m Feb 11 '21 at 23:30