I have experience with asynchronous libraries like Vert.x but new to Reactor/WebFlux specifically. I want to expose a single endpoint on a web application that when hit, turns around and calls another web service, parses the response into a Java object, then accesses fields within the object and does something with them. I am using WebClient to make the HTTP call and Jackson ObjectMapper to deserialize it. My code looks roughly like this (note: RequestUtil.safeDeserialize
just uses Jackson to parse the string body into an object and returns Optional<Object>
which is why I have an additional map
step afterwards):
public Mono<String> function(final String param) {
final String encodedRequestBody = RequestUtil.encodeRequestBody(param);
final Mono<Response> responseMono = webClient.post()
.uri("/endpoint")
.header("Content-Type", "application/x-www-form-urlencoded")
.header("Authorization", "Basic " + basicAuthHeader)
.accept(MediaType.APPLICATION_JSON)
.body(BodyInserters.fromPublisher(Mono.just(encodedRequestBody), String.class))
.exchange()
.flatMap(clientResponseMono -> clientResponseMono.bodyToMono(String.class))
.map(RequestUtil::safeDeserialize)
.map(resp -> resp.orElseThrow(() -> new RuntimeException("Failed to deserialize Oscar response!")));
responseMono.subscribe(response -> {
// Pull out some of the fields from the `Response` type object and do something with them
});
return responseMono.map(Response::aStringField);
}
After performance testing this code against an identical application that follows the exact same logic, but makes the HTTP call via the blocking Java11 HttpClient
class, I see almost no difference between the two -- in fact, the WebClient
implementation is slightly less performant than the blocking implementation.
Clearly I made a mistake somewhere either with the code or my mental model of what's going on here, so any help/advice is very appreciated. Thanks!
Edit: Based on the advice in @Toerktumlare's response, I have updated the function to the following:
public Mono<String> function(final String param) {
final Mono<String> encodedRequestBody = RequestUtil.encodeRequestBodyToMono(param);
final Mono<Response> responseMono = webClient.post()
.uri("/endpoint")
.header("Content-Type", "application/x-www-form-urlencoded")
.header("Authorization", "Basic " + basicAuthHeader)
.accept(MediaType.APPLICATION_JSON)
.body(encodedRequestBody, String.class)
.retrieve()
.bodyToMono(Response.class);
return responseMono.flatMap(response -> {
final String field = response.field();
// Use `field` to do something that would produce a log message
logger.debug("Field is: {}", field);
return Mono.just(field);
});
}
When running this code, I don't see any logging. This makes me think that the HTTP call isn't actually happening (or completing in time?) because when I use subscribe
with the same WebClient
code, I can successfully print out fields from the response. What am I missing?
Edit2: This function is being used to serve responses to an endpoint (a few lines of code are omitted for conciseness):
@Bean
public RouterFunction<ServerResponse> routerFunction(ResponseHandler handler) {
return RouterFunctions.route(RequestPredicates.GET("/my/endpoint")
.and(RequestPredicates.accept(MediaType.ALL)), handler::endpoint);
}
public Mono<ServerResponse> endpoint(ServerRequest request) {
// Pull out a comma-separated list from the request
final List<String> params = Arrays.asList(fieldFromRequest.split(","));
// For each param, call function(param), roll into list
List<Mono<String>> results = params.stream()
.map(nonBlockingClient::function)
.collect(Collectors.toList());
// Check to see if any of the requests failed
if (results.size() != params.size()) {
return ServerResponse.status(500).build();
}
logger.debug("Success");
return ServerResponse.ok().build();
}