1

I am using Spring Webflux, along with netty to build a microservie and I want to capture headers and body of a request and response in the services using webclient. A Web filter configured helps capture request body but not response body. An exchange filter function helps capture response body but not request body. I figured I needed to implement both web filter and exchange filter function.

Here is the controller:

@RestController
public class searchController{
@PostMapping(path="/search")
public Mono<String> searchGroups(@RequestBody searchString, @RequestHeader HttpHeaders httpHeaders)
// loggers here
Mono<String> searchedItem = searchService.search(httpHeaders, searchString);
return searchedItem;
}

Here is the service implementation:

@override
public String search(Httpheaders httpheaders, String searchString){
// process search string here
    webclient = WebClient.builder.filter(logResponse).build();
// do some processing here to get list
   return searchedItem;
}

Here is the exchange filter function to capture response:

public static ExchangeFilterFunction logResponse() {
       return ExchangeFilterFunction.ofResponseProcessor(clientResponse -> {
           if (clientResponse != null) {
               prepareJson(clientResponse.headers().asHttpHeaders(), clientResponse.statusCode(), clientResponse.bodyToMono(String.class));
           }
           return Mono.just(clientResponse);
       });
   }

Here is the code for webfilter to capture request:

@Override
public Mono<Void> filter(ServerWebExchange webExchange, WebFilterChain chain) {
        ServerHttpRequest serverHttpRequest = webExchange.getRequest();
        logRequest(serverHttpRequest);
        logger.info("getting ip address");
        Optional<InetSocketAddress> inetSocketAddress = Optional.ofNullable(webExchange.getRequest().getRemoteAddress());
        logger.info("adding forwarded header to hold ip address of the client");
        if(inetSocketAddress.isPresent()) {
            ServerHttpRequest mutatedRequest = serverHttpRequest.mutate().header("forwarded", "for="+inetSocketAddress.get().getAddress()).build();
            logger.info("mutating the request");
            webExchange = webExchange.mutate().request(mutatedRequest).build();
        }
        return chain.filter(webExchange);
    }

The web filter is throwing an exception when the request is passed on, and here is the stack trace of the exception that I am seeing:

java.lang.IllegalStateException: DEMAND
    at org.springframework.http.server.reactive.AbstractListenerReadPublisher$State.subscribe(AbstractListenerReadPublisher.java:428) ~[spring-web-5.3.1.jar:5.3.1]
    at org.springframework.http.server.reactive.AbstractListenerReadPublisher.subscribe(AbstractListenerReadPublisher.java:105) ~[spring-web-5.3.1.jar:5.3.1]
    at reactor.core.publisher.FluxSource.subscribe(FluxSource.java:66) ~[reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.Mono.subscribe(Mono.java:3987) [reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.MonoZip.subscribe(MonoZip.java:128) [reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) [reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) [reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:154) [reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) [reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) [reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157) [reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:73) [reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82) [reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:281) [reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:860) [reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127) [reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180) [reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2346) [reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.request(MonoPeekTerminal.java:139) [reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:169) [reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2154) [reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:2028) [reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96) [reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onSubscribe(MonoPeekTerminal.java:152) [reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:54) [reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.Mono.subscribe(Mono.java:3987) [reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:448) [reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onSubscribe(FluxConcatMap.java:218) [reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:164) [reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:86) [reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) [reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) [reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) [reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) [reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) [reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) [reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) [reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) [reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.Mono.subscribe(Mono.java:3987) [reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:173) [reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) [reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.Mono.subscribe(Mono.java:3987) [reactor-core-3.4.0.jar:3.4.0]
    at org.springframework.http.server.reactive.ServletHttpHandlerAdapter.service(ServletHttpHandlerAdapter.java:192) [spring-web-5.3.1.jar:5.3.1]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231) [tomcat-embed-core-9.0.39.jar:9.0.39]
10:30:24.934 stderr ERROR [http-nio-9090-exec-2] ExceptionLibraryHandler.java:45 - Exception due to DEMAND
java.lang.IllegalStateException: DEMAND
    at org.springframework.http.server.reactive.AbstractListenerReadPublisher$State.subscribe(AbstractListenerReadPublisher.java:428) ~[spring-web-5.3.1.jar:5.3.1]
    at org.springframework.http.server.reactive.AbstractListenerReadPublisher.subscribe(AbstractListenerReadPublisher.java:105) ~[spring-web-5.3.1.jar:5.3.1]
    at reactor.core.publisher.FluxSource.subscribe(FluxSource.java:66) ~[reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.Mono.subscribe(Mono.java:3987) [reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.MonoZip.subscribe(MonoZip.java:128) [reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) [reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) [reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:154) [reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) [reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) [reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157) [reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:73) [reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82) [reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:281) [reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:860) [reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127) [reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180) [reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2346) [reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.request(MonoPeekTerminal.java:139) [reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:169) [reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2154) [reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:2028) [reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96) [reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onSubscribe(MonoPeekTerminal.java:152) [reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:54) [reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.Mono.subscribe(Mono.java:3987) [reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:448) [reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onSubscribe(FluxConcatMap.java:218) [reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:164) [reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:86) [reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) [reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) [reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) [reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) [reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) [reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) [reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) [reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) [reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.Mono.subscribe(Mono.java:3987) [reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:173) [reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) [reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.Mono.subscribe(Mono.java:3987) [reactor-core-3.4.0.jar:3.4.0]
    at org.springframework.http.server.reactive.ServletHttpHandlerAdapter.service(ServletHttpHandlerAdapter.java:192) [spring-web-5.3.1.jar:5.3.1]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231) [tomcat-embed-core-9.0.39.jar:9.0.39]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) [tomcat-embed-core-9.0.39.jar:9.0.39]
    at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53) [tomcat-embed-websocket-9.0.39.jar:9.0.39]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) [tomcat-embed-core-9.0.39.jar:9.0.39]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) [tomcat-embed-core-9.0.39.jar:9.0.39]
    at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:202) [tomcat-embed-core-9.0.39.jar:9.0.39]
    at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:97) [tomcat-embed-core-9.0.39.jar:9.0.39]
    at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:542) [tomcat-embed-core-9.0.39.jar:9.0.39]
    at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:143) [tomcat-embed-core-9.0.39.jar:9.0.39]
    at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92) [tomcat-embed-core-9.0.39.jar:9.0.39]
    at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:78) [tomcat-embed-core-9.0.39.jar:9.0.39]
    at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:343) [tomcat-embed-core-9.0.39.jar:9.0.39]
    at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:374) [tomcat-embed-core-9.0.39.jar:9.0.39]
    at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65) [tomcat-embed-core-9.0.39.jar:9.0.39]
    at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:868) [tomcat-embed-core-9.0.39.jar:9.0.39]
    at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1590) [tomcat-embed-core-9.0.39.jar:9.0.39]
    at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) [tomcat-embed-core-9.0.39.jar:9.0.39]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
    at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) [tomcat-embed-core-9.0.39.jar:9.0.39]
    at java.lang.Thread.run(Thread.java:834) [?:?]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) [tomcat-embed-core-9.0.39.jar:9.0.39]
    at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53) [tomcat-embed-websocket-9.0.39.jar:9.0.39]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) [tomcat-embed-core-9.0.39.jar:9.0.39]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) [tomcat-embed-core-9.0.39.jar:9.0.39]
    at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:202) [tomcat-embed-core-9.0.39.jar:9.0.39]
    at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:97) [tomcat-embed-core-9.0.39.jar:9.0.39]
    at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:542) [tomcat-embed-core-9.0.39.jar:9.0.39]
    at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:143) [tomcat-embed-core-9.0.39.jar:9.0.39]
    at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92) [tomcat-embed-core-9.0.39.jar:9.0.39]
    at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:78) [tomcat-embed-core-9.0.39.jar:9.0.39]
    at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:343) [tomcat-embed-core-9.0.39.jar:9.0.39]
    at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:374) [tomcat-embed-core-9.0.39.jar:9.0.39]
    at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65) [tomcat-embed-core-9.0.39.jar:9.0.39]
    at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:868) [tomcat-embed-core-9.0.39.jar:9.0.39]
    at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1590) [tomcat-embed-core-9.0.39.jar:9.0.39]
    at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) [tomcat-embed-core-9.0.39.jar:9.0.39]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
    at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) [tomcat-embed-core-9.0.39.jar:9.0.39]
    at java.lang.Thread.run(Thread.java:834) [?:?]

Now my question is, how do I resolve the exception, any hints or clues would help, looking at the exception I wonder scratching my head, where the mistake is.

Here is some useful links that I found, helped me arrive at above solution hoping these might be useful for anybody looking to solve a similar problem:

Hot to get body as String from spring reactive ClientRequest? - This is a good solution but it seems a POJO class should be passed in the method, I cannot pass a POJO, if I pass String.class causes an exception is thrown.

Here is another implementation suggesting the use of exchange filter functions:

https://www.baeldung.com/spring-log-webclient-calls

https://careydevelopment.us/blog/spring-webflux-how-to-log-requests-with-webclient

https://careydevelopment.us/blog/spring-webflux-how-to-log-responses-with-webclient

The above posts suggest setting log level to debug in the underlying netty server, not sure if that is desirable in production environment.

I also looked into using decorators here based on the information found here, but data cannot be cached:

Copy of the request/response body on a Spring reactive app?

and here:

Java Web Flux : How to fetch body from ServerHttpResponse?

When an exception occurs exception handler kicks in and throws the above exception, decorators did not help.

I also built an exception handler based on recommendation here (although it wasn't given higher order): https://github.com/spring-projects/spring-framework/issues/24368

Anvesh Raavi
  • 303
  • 1
  • 3
  • 14
  • there is too little information about your application, pom.xml, how is this executed, what server are you using netty, undertow, etc. Showing two classes and nothing else is way too little information. Cant reproduce voted to close. – Toerktumlare Jul 22 '21 at 21:33

0 Answers0