I am using Spring Webflux, along with netty to build a microservie and I want to capture headers and body of a request and response in the services using webclient. A Web filter configured helps capture request body but not response body. An exchange filter function helps capture response body but not request body. I figured I needed to implement both web filter and exchange filter function.
Here is the controller:
@RestController
public class searchController{
@PostMapping(path="/search")
public Mono<String> searchGroups(@RequestBody searchString, @RequestHeader HttpHeaders httpHeaders)
// loggers here
Mono<String> searchedItem = searchService.search(httpHeaders, searchString);
return searchedItem;
}
Here is the service implementation:
@override
public String search(Httpheaders httpheaders, String searchString){
// process search string here
webclient = WebClient.builder.filter(logResponse).build();
// do some processing here to get list
return searchedItem;
}
Here is the exchange filter function to capture response:
public static ExchangeFilterFunction logResponse() {
return ExchangeFilterFunction.ofResponseProcessor(clientResponse -> {
if (clientResponse != null) {
prepareJson(clientResponse.headers().asHttpHeaders(), clientResponse.statusCode(), clientResponse.bodyToMono(String.class));
}
return Mono.just(clientResponse);
});
}
Here is the code for webfilter to capture request:
@Override
public Mono<Void> filter(ServerWebExchange webExchange, WebFilterChain chain) {
ServerHttpRequest serverHttpRequest = webExchange.getRequest();
logRequest(serverHttpRequest);
logger.info("getting ip address");
Optional<InetSocketAddress> inetSocketAddress = Optional.ofNullable(webExchange.getRequest().getRemoteAddress());
logger.info("adding forwarded header to hold ip address of the client");
if(inetSocketAddress.isPresent()) {
ServerHttpRequest mutatedRequest = serverHttpRequest.mutate().header("forwarded", "for="+inetSocketAddress.get().getAddress()).build();
logger.info("mutating the request");
webExchange = webExchange.mutate().request(mutatedRequest).build();
}
return chain.filter(webExchange);
}
The web filter is throwing an exception when the request is passed on, and here is the stack trace of the exception that I am seeing:
java.lang.IllegalStateException: DEMAND
at org.springframework.http.server.reactive.AbstractListenerReadPublisher$State.subscribe(AbstractListenerReadPublisher.java:428) ~[spring-web-5.3.1.jar:5.3.1]
at org.springframework.http.server.reactive.AbstractListenerReadPublisher.subscribe(AbstractListenerReadPublisher.java:105) ~[spring-web-5.3.1.jar:5.3.1]
at reactor.core.publisher.FluxSource.subscribe(FluxSource.java:66) ~[reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.Mono.subscribe(Mono.java:3987) [reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.MonoZip.subscribe(MonoZip.java:128) [reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) [reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) [reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:154) [reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) [reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) [reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157) [reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:73) [reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82) [reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:281) [reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:860) [reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127) [reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180) [reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2346) [reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.request(MonoPeekTerminal.java:139) [reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:169) [reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2154) [reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:2028) [reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96) [reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onSubscribe(MonoPeekTerminal.java:152) [reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:54) [reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.Mono.subscribe(Mono.java:3987) [reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:448) [reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onSubscribe(FluxConcatMap.java:218) [reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:164) [reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:86) [reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) [reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) [reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) [reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) [reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) [reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) [reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) [reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) [reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.Mono.subscribe(Mono.java:3987) [reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:173) [reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) [reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.Mono.subscribe(Mono.java:3987) [reactor-core-3.4.0.jar:3.4.0]
at org.springframework.http.server.reactive.ServletHttpHandlerAdapter.service(ServletHttpHandlerAdapter.java:192) [spring-web-5.3.1.jar:5.3.1]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231) [tomcat-embed-core-9.0.39.jar:9.0.39]
10:30:24.934 stderr ERROR [http-nio-9090-exec-2] ExceptionLibraryHandler.java:45 - Exception due to DEMAND
java.lang.IllegalStateException: DEMAND
at org.springframework.http.server.reactive.AbstractListenerReadPublisher$State.subscribe(AbstractListenerReadPublisher.java:428) ~[spring-web-5.3.1.jar:5.3.1]
at org.springframework.http.server.reactive.AbstractListenerReadPublisher.subscribe(AbstractListenerReadPublisher.java:105) ~[spring-web-5.3.1.jar:5.3.1]
at reactor.core.publisher.FluxSource.subscribe(FluxSource.java:66) ~[reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.Mono.subscribe(Mono.java:3987) [reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.MonoZip.subscribe(MonoZip.java:128) [reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) [reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) [reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:154) [reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) [reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) [reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157) [reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:73) [reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82) [reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:281) [reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:860) [reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127) [reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180) [reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2346) [reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.request(MonoPeekTerminal.java:139) [reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:169) [reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2154) [reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:2028) [reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96) [reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onSubscribe(MonoPeekTerminal.java:152) [reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:54) [reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.Mono.subscribe(Mono.java:3987) [reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:448) [reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onSubscribe(FluxConcatMap.java:218) [reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:164) [reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:86) [reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) [reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) [reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) [reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) [reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) [reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) [reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) [reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) [reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.Mono.subscribe(Mono.java:3987) [reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:173) [reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) [reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.Mono.subscribe(Mono.java:3987) [reactor-core-3.4.0.jar:3.4.0]
at org.springframework.http.server.reactive.ServletHttpHandlerAdapter.service(ServletHttpHandlerAdapter.java:192) [spring-web-5.3.1.jar:5.3.1]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231) [tomcat-embed-core-9.0.39.jar:9.0.39]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) [tomcat-embed-core-9.0.39.jar:9.0.39]
at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53) [tomcat-embed-websocket-9.0.39.jar:9.0.39]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) [tomcat-embed-core-9.0.39.jar:9.0.39]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) [tomcat-embed-core-9.0.39.jar:9.0.39]
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:202) [tomcat-embed-core-9.0.39.jar:9.0.39]
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:97) [tomcat-embed-core-9.0.39.jar:9.0.39]
at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:542) [tomcat-embed-core-9.0.39.jar:9.0.39]
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:143) [tomcat-embed-core-9.0.39.jar:9.0.39]
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92) [tomcat-embed-core-9.0.39.jar:9.0.39]
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:78) [tomcat-embed-core-9.0.39.jar:9.0.39]
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:343) [tomcat-embed-core-9.0.39.jar:9.0.39]
at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:374) [tomcat-embed-core-9.0.39.jar:9.0.39]
at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65) [tomcat-embed-core-9.0.39.jar:9.0.39]
at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:868) [tomcat-embed-core-9.0.39.jar:9.0.39]
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1590) [tomcat-embed-core-9.0.39.jar:9.0.39]
at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) [tomcat-embed-core-9.0.39.jar:9.0.39]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) [tomcat-embed-core-9.0.39.jar:9.0.39]
at java.lang.Thread.run(Thread.java:834) [?:?]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) [tomcat-embed-core-9.0.39.jar:9.0.39]
at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53) [tomcat-embed-websocket-9.0.39.jar:9.0.39]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) [tomcat-embed-core-9.0.39.jar:9.0.39]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) [tomcat-embed-core-9.0.39.jar:9.0.39]
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:202) [tomcat-embed-core-9.0.39.jar:9.0.39]
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:97) [tomcat-embed-core-9.0.39.jar:9.0.39]
at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:542) [tomcat-embed-core-9.0.39.jar:9.0.39]
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:143) [tomcat-embed-core-9.0.39.jar:9.0.39]
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92) [tomcat-embed-core-9.0.39.jar:9.0.39]
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:78) [tomcat-embed-core-9.0.39.jar:9.0.39]
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:343) [tomcat-embed-core-9.0.39.jar:9.0.39]
at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:374) [tomcat-embed-core-9.0.39.jar:9.0.39]
at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65) [tomcat-embed-core-9.0.39.jar:9.0.39]
at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:868) [tomcat-embed-core-9.0.39.jar:9.0.39]
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1590) [tomcat-embed-core-9.0.39.jar:9.0.39]
at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) [tomcat-embed-core-9.0.39.jar:9.0.39]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) [tomcat-embed-core-9.0.39.jar:9.0.39]
at java.lang.Thread.run(Thread.java:834) [?:?]
Now my question is, how do I resolve the exception, any hints or clues would help, looking at the exception I wonder scratching my head, where the mistake is.
Here is some useful links that I found, helped me arrive at above solution hoping these might be useful for anybody looking to solve a similar problem:
Hot to get body as String from spring reactive ClientRequest? - This is a good solution but it seems a POJO class should be passed in the method, I cannot pass a POJO, if I pass String.class causes an exception is thrown.
Here is another implementation suggesting the use of exchange filter functions:
https://www.baeldung.com/spring-log-webclient-calls
https://careydevelopment.us/blog/spring-webflux-how-to-log-requests-with-webclient
https://careydevelopment.us/blog/spring-webflux-how-to-log-responses-with-webclient
The above posts suggest setting log level to debug in the underlying netty server, not sure if that is desirable in production environment.
I also looked into using decorators here based on the information found here, but data cannot be cached:
Copy of the request/response body on a Spring reactive app?
and here:
Java Web Flux : How to fetch body from ServerHttpResponse?
When an exception occurs exception handler kicks in and throws the above exception, decorators did not help.
I also built an exception handler based on recommendation here (although it wasn't given higher order): https://github.com/spring-projects/spring-framework/issues/24368