It makes sense that the response body should only be consumed once. This happens in a filter that I have for logging purposes that is split over two classes/methods. Here is the filter itself:
public Mono<ClientResponse> filter(ClientRequest request, ExchangeFunction next) {
Logger logger = loggingHelper.loggerFor(request);
if (loggingHelper.shouldLogRequestsAndResponses(logger)) {
String loggingId = loggingHelper.getOrCreateCorrelationId();
return next.exchange(request)
.flatMap(clientResponse -> messageCreator.formatMessage(clientResponse, loggingId))
.doOnNext(responseData ->
logger.trace("{" + responseData.getLogMessage() + "}"))
.map(ResponseData::getResponse);
} else {
return next.exchange(request);
}
}
And here is the formatMessage
from the messageCreator
.
public Mono<ResponseData> formatMessage(ClientResponse response, String loggingId) {
String responseJson = loggingUtils.createResponseLoggingJson(response, loggingId);
ResponseData responseData = new ResponseData(response, responseJson);
return response.bodyToMono(DataBuffer.class)
.map(data -> {
ClientResponse clones = response.mutate()
.body(
Flux.just(data))
.build();
String body = data.toString(StandardCharsets.UTF_8);
responseData.add("\"" + BODY_FIELD + "\":\"" + body + "\"");
return new ResponseData(clones, responseData.getLogMessage());
})
.defaultIfEmpty(responseData);
}
As you can see I consume the original response body to get a DataBuffer
used to create a cloned ClientResponse, which is then used in the filter to continue in the chain.
I do get the the actual response out of the client and I get the log message. Never the less there is an exception:
2022-05-13 09:47:36,530 | 10788 | httpclient-dispatch-1 | | ERROR | reactor.core.publisher.Operators | Operator called default onErrorDropped reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: The client response body can only be consumed once.
Caused by: java.lang.IllegalStateException: The client response body can only be consumed once.
at org.springframework.http.client.reactive.HttpComponentsClientHttpResponse.lambda$getBody$1(HttpComponentsClientHttpResponse.java:106)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
*__checkpoint ⇢ Body from GET ... [DefaultClientResponse]
Original Stack Trace:
at org.springframework.http.client.reactive.HttpComponentsClientHttpResponse.lambda$getBody$1(HttpComponentsClientHttpResponse.java:106)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onSubscribe(FluxPeek.java:162)
at org.apache.hc.core5.reactive.ReactiveDataConsumer.subscribe(ReactiveDataConsumer.java:166)
at reactor.core.publisher.FluxSource.subscribe(FluxSource.java:67)
at reactor.core.publisher.Flux.subscribe(Flux.java:8469)
at reactor.core.publisher.Flux.subscribeWith(Flux.java:8642)
at reactor.core.publisher.Flux.subscribe(Flux.java:8439)
at reactor.core.publisher.Flux.subscribe(Flux.java:8363)
at reactor.core.publisher.Flux.subscribe(Flux.java:8306)
at org.springframework.web.reactive.function.client.DefaultClientResponseBuilder.releaseBody(DefaultClientResponseBuilder.java:195)
at org.springframework.web.reactive.function.client.DefaultClientResponseBuilder.body(DefaultClientResponseBuilder.java:177)
at com.netcetera.mobaint.utils.filter.message.BaseResponseMessageCreator.lambda$formatMessage$0(BaseResponseMessageCreator.java:39)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:113)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:295)
at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:337)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
at reactor.core.publisher.MonoCollect$CollectSubscriber.onComplete(MonoCollect.java:159)
at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260)
at org.apache.hc.core5.reactive.ReactiveDataConsumer.flushToSubscriber(ReactiveDataConsumer.java:155)
at org.apache.hc.core5.reactive.ReactiveDataConsumer.streamEnd(ReactiveDataConsumer.java:113)
at org.apache.hc.core5.reactive.ReactiveResponseConsumer.streamEnd(ReactiveResponseConsumer.java:162)
at org.apache.hc.client5.http.impl.async.HttpAsyncMainClientExec$1.streamEnd(HttpAsyncMainClientExec.java:233)
at org.apache.hc.core5.http.impl.nio.ClientHttp1StreamHandler.dataEnd(ClientHttp1StreamHandler.java:280)
at org.apache.hc.core5.http.impl.nio.ClientHttp1StreamDuplexer.dataEnd(ClientHttp1StreamDuplexer.java:366)
at org.apache.hc.core5.http.impl.nio.AbstractHttp1StreamDuplexer.onInput(AbstractHttp1StreamDuplexer.java:333)
at org.apache.hc.core5.http.impl.nio.AbstractHttp1IOEventHandler.inputReady(AbstractHttp1IOEventHandler.java:64)
at org.apache.hc.core5.http.impl.nio.ClientHttp1IOEventHandler.inputReady(ClientHttp1IOEventHandler.java:39)
at org.apache.hc.core5.reactor.ssl.SSLIOSession.decryptData(SSLIOSession.java:550)
at org.apache.hc.core5.reactor.ssl.SSLIOSession.access$400(SSLIOSession.java:72)
at org.apache.hc.core5.reactor.ssl.SSLIOSession$1.inputReady(SSLIOSession.java:172)
at org.apache.hc.core5.reactor.InternalDataChannel.onIOEvent(InternalDataChannel.java:131)
at org.apache.hc.core5.reactor.InternalChannel.handleIOEvent(InternalChannel.java:51)
at org.apache.hc.core5.reactor.SingleCoreIOReactor.processEvents(SingleCoreIOReactor.java:178)
at org.apache.hc.core5.reactor.SingleCoreIOReactor.doExecute(SingleCoreIOReactor.java:127)
at org.apache.hc.core5.reactor.AbstractSingleCoreIOReactor.execute(AbstractSingleCoreIOReactor.java:85)
at org.apache.hc.core5.reactor.IOReactorWorker.run(IOReactorWorker.java:44)
at java.base/java.lang.Thread.run(Thread.java:829)
This occurs on the body
call upon the mutated response.
Is there any way to prevent this exception to occur or how would I be able to catch the exception so that it does no longer turn up as an error?
Addendum:
The root cause looks similar to How to log request body in spring Webflux Java which is true. However that issue is at the other end of the problem, where we are the server and get a request and return a response. This issue is about using the WebClient
to make a request and receive the response. For that reason the proposed solution using a WebFilter
is not applicable here - or at least I do not see how.