0

It makes sense that the response body should only be consumed once. This happens in a filter that I have for logging purposes that is split over two classes/methods. Here is the filter itself:

public Mono<ClientResponse> filter(ClientRequest request, ExchangeFunction next) {
  Logger logger = loggingHelper.loggerFor(request);
  if (loggingHelper.shouldLogRequestsAndResponses(logger)) {
    String loggingId = loggingHelper.getOrCreateCorrelationId();

    return next.exchange(request)
      .flatMap(clientResponse -> messageCreator.formatMessage(clientResponse, loggingId))
      .doOnNext(responseData ->
        logger.trace("{" + responseData.getLogMessage() + "}"))
      .map(ResponseData::getResponse);
  } else {
    return next.exchange(request);
  }
}

And here is the formatMessage from the messageCreator.

public Mono<ResponseData> formatMessage(ClientResponse response, String loggingId) {
  String responseJson = loggingUtils.createResponseLoggingJson(response, loggingId);
  ResponseData responseData = new ResponseData(response, responseJson);
  return response.bodyToMono(DataBuffer.class)
    .map(data -> {
      ClientResponse clones = response.mutate()
        .body(
          Flux.just(data))
        .build();
      String body = data.toString(StandardCharsets.UTF_8);
      responseData.add("\"" + BODY_FIELD + "\":\"" + body + "\"");
      return new ResponseData(clones, responseData.getLogMessage());
  })
    .defaultIfEmpty(responseData);
}

As you can see I consume the original response body to get a DataBuffer used to create a cloned ClientResponse, which is then used in the filter to continue in the chain.

I do get the the actual response out of the client and I get the log message. Never the less there is an exception:

2022-05-13 09:47:36,530 | 10788 | httpclient-dispatch-1 | | ERROR | reactor.core.publisher.Operators | Operator called default onErrorDropped reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: The client response body can only be consumed once.
Caused by: java.lang.IllegalStateException: The client response body can only be consumed once.
    at org.springframework.http.client.reactive.HttpComponentsClientHttpResponse.lambda$getBody$1(HttpComponentsClientHttpResponse.java:106)
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
    *__checkpoint ⇢ Body from GET ... [DefaultClientResponse]
Original Stack Trace:
        at org.springframework.http.client.reactive.HttpComponentsClientHttpResponse.lambda$getBody$1(HttpComponentsClientHttpResponse.java:106)
        at reactor.core.publisher.FluxPeek$PeekSubscriber.onSubscribe(FluxPeek.java:162)
        at org.apache.hc.core5.reactive.ReactiveDataConsumer.subscribe(ReactiveDataConsumer.java:166)
        at reactor.core.publisher.FluxSource.subscribe(FluxSource.java:67)
        at reactor.core.publisher.Flux.subscribe(Flux.java:8469)
        at reactor.core.publisher.Flux.subscribeWith(Flux.java:8642)
        at reactor.core.publisher.Flux.subscribe(Flux.java:8439)
        at reactor.core.publisher.Flux.subscribe(Flux.java:8363)
        at reactor.core.publisher.Flux.subscribe(Flux.java:8306)
        at org.springframework.web.reactive.function.client.DefaultClientResponseBuilder.releaseBody(DefaultClientResponseBuilder.java:195)
        at org.springframework.web.reactive.function.client.DefaultClientResponseBuilder.body(DefaultClientResponseBuilder.java:177)
        at com.netcetera.mobaint.utils.filter.message.BaseResponseMessageCreator.lambda$formatMessage$0(BaseResponseMessageCreator.java:39)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:113)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
        at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:295)
        at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:337)
        at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
        at reactor.core.publisher.MonoCollect$CollectSubscriber.onComplete(MonoCollect.java:159)
        at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142)
        at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260)
        at org.apache.hc.core5.reactive.ReactiveDataConsumer.flushToSubscriber(ReactiveDataConsumer.java:155)
        at org.apache.hc.core5.reactive.ReactiveDataConsumer.streamEnd(ReactiveDataConsumer.java:113)
        at org.apache.hc.core5.reactive.ReactiveResponseConsumer.streamEnd(ReactiveResponseConsumer.java:162)
        at org.apache.hc.client5.http.impl.async.HttpAsyncMainClientExec$1.streamEnd(HttpAsyncMainClientExec.java:233)
        at org.apache.hc.core5.http.impl.nio.ClientHttp1StreamHandler.dataEnd(ClientHttp1StreamHandler.java:280)
        at org.apache.hc.core5.http.impl.nio.ClientHttp1StreamDuplexer.dataEnd(ClientHttp1StreamDuplexer.java:366)
        at org.apache.hc.core5.http.impl.nio.AbstractHttp1StreamDuplexer.onInput(AbstractHttp1StreamDuplexer.java:333)
        at org.apache.hc.core5.http.impl.nio.AbstractHttp1IOEventHandler.inputReady(AbstractHttp1IOEventHandler.java:64)
        at org.apache.hc.core5.http.impl.nio.ClientHttp1IOEventHandler.inputReady(ClientHttp1IOEventHandler.java:39)
        at org.apache.hc.core5.reactor.ssl.SSLIOSession.decryptData(SSLIOSession.java:550)
        at org.apache.hc.core5.reactor.ssl.SSLIOSession.access$400(SSLIOSession.java:72)
        at org.apache.hc.core5.reactor.ssl.SSLIOSession$1.inputReady(SSLIOSession.java:172)
        at org.apache.hc.core5.reactor.InternalDataChannel.onIOEvent(InternalDataChannel.java:131)
        at org.apache.hc.core5.reactor.InternalChannel.handleIOEvent(InternalChannel.java:51)
        at org.apache.hc.core5.reactor.SingleCoreIOReactor.processEvents(SingleCoreIOReactor.java:178)
        at org.apache.hc.core5.reactor.SingleCoreIOReactor.doExecute(SingleCoreIOReactor.java:127)
        at org.apache.hc.core5.reactor.AbstractSingleCoreIOReactor.execute(AbstractSingleCoreIOReactor.java:85)
        at org.apache.hc.core5.reactor.IOReactorWorker.run(IOReactorWorker.java:44)
        at java.base/java.lang.Thread.run(Thread.java:829)

This occurs on the body call upon the mutated response.

Is there any way to prevent this exception to occur or how would I be able to catch the exception so that it does no longer turn up as an error?

Addendum:

The root cause looks similar to How to log request body in spring Webflux Java which is true. However that issue is at the other end of the problem, where we are the server and get a request and return a response. This issue is about using the WebClient to make a request and receive the response. For that reason the proposed solution using a WebFilter is not applicable here - or at least I do not see how.

hotzst
  • 7,238
  • 9
  • 41
  • 64
  • Does this answer your question? [How to log request body in spring Webflux Java](https://stackoverflow.com/questions/61706948/how-to-log-request-body-in-spring-webflux-java) – Alex May 16 '22 at 14:43
  • @Alex Not really as that one focuses on logging incoming requests (and possibly the responses). The issue I have here is when using a `WebClient` that sends out the requests and receives the responses. – hotzst May 17 '22 at 08:27
  • From what I see the root cause is the same - you are trying to consume body in filter and then in the client itself. – Alex May 18 '22 at 02:05

0 Answers0