3

In How to create a Spring Reactor Flux from Http integration flow? artem-bilan mentioned in a comment that it will be possible to use the webflux integration in the future.

Since the time when the comment was written, the WebFlux integration has been factored out to spring-integration-webflux. I have tried the following to replicate the MVC based http->flux integration with a WebFlux based one by replacing the Http.inboundChannelAdapter and the @GetRequest handler of the MVC version with a WebFlux.inboundChannelAdapter and WebFlux.inboundGateway:

@SpringBootApplication
public class WebfluxApplication {

  public static void main(String[] args) {
    SpringApplication.run(WebfluxApplication.class, args);
  }


  @Bean
  public Publisher<Message<String>> reactiveSource() {
    return IntegrationFlows.
            from(WebFlux.inboundChannelAdapter("/message/{id}")
                    .requestMapping(r -> r
                            .methods(HttpMethod.POST)
                    )
                    .payloadExpression("#pathVariables.id")
            )
            .log()
            .channel(MessageChannels.flux())
            .toReactivePublisher();
  }


  @Bean
  public IntegrationFlow eventMessages() {
    return IntegrationFlows
            .from(WebFlux.inboundGateway("/events")
                    .requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE)))
            .handle((p, h) -> reactiveSource())                
            .get();
}

}

It appears that the flow in the reactiveSource() publisher does not receive any messages, at least nothing is logged for my .log() statement.

When I replace the reactiveSource() publisher in the eventMessages flow

.handle((p, h) -> reactiveSource()) 

by a fake publisher

.handle((p, h) -> Flux.just("foo", "bar"))

I get SSE responses from

curl localhost:8080/events

The trace log shows that the reactiveSource() POST handler is mapped and the WebFluxInboundEndpoint.handle method is being invoked:

2018-05-05 16:50:58.788  INFO 6552 --- [           main] xIntegrationRequestMappingHandlerMapping : Mapped "{[/message/{id}],methods=[POST]}" onto public abstract reactor.core.publisher.Mono<java.lang.Void> org.springframework.web.server.WebHandler.handle(org.springframework.web.server.ServerWebExchange)
2018-05-05 16:50:58.789  INFO 6552 --- [           main] xIntegrationRequestMappingHandlerMapping : Mapped "{[/events],methods=[GET || POST],produces=[text/event-stream]}" onto public abstract reactor.core.publisher.Mono<java.lang.Void> org.springframework.web.server.WebHandler.handle(org.springframework.web.server.ServerWebExchange)
2018-05-05 16:50:59.191  INFO 6552 --- [ctor-http-nio-1] r.ipc.netty.tcp.BlockingNettyContext     : Started HttpServer on /0:0:0:0:0:0:0:0:8080
2018-05-05 16:50:59.192  INFO 6552 --- [           main] o.s.b.web.embedded.netty.NettyWebServer  : Netty started on port(s): 8080
2018-05-05 16:50:59.196  INFO 6552 --- [           main] d.e.sample.webflux.WebfluxApplication    : Started WebfluxApplication in 2.608 seconds (JVM running for 3.419)
2018-05-05 16:51:06.918 DEBUG 6552 --- [ctor-http-nio-2] o.s.web.reactive.DispatcherHandler       : Processing POST request for [http://localhost:8080/message/4]
2018-05-05 16:51:06.932 DEBUG 6552 --- [ctor-http-nio-2] s.w.r.r.m.a.RequestMappingHandlerMapping : Looking up handler method for path /message/4
2018-05-05 16:51:06.933 DEBUG 6552 --- [ctor-http-nio-2] s.w.r.r.m.a.RequestMappingHandlerMapping : Did not find handler method for [/message/4]
2018-05-05 16:51:06.967 TRACE 6552 --- [ctor-http-nio-2] o.s.w.r.r.method.InvocableHandlerMethod  : Invoking 'org.springframework.integration.webflux.inbound.WebFluxInboundEndpoint.handle' with arguments [org.springframework.web.server.adapter.DefaultServerWebExchange@775cdb20]
2018-05-05 16:51:06.967 TRACE 6552 --- [ctor-http-nio-2] o.s.w.r.r.method.InvocableHandlerMethod  : Method [org.springframework.integration.webflux.inbound.WebFluxInboundEndpoint.handle] returned [MonoDefer]
2018-05-05 16:51:06.967 DEBUG 6552 --- [ctor-http-nio-2] o.s.w.r.r.method.InvocableHandlerMethod  : Response fully handled in controller method
2018-05-05 16:51:11.363 DEBUG 6552 --- [ctor-http-nio-3] o.s.web.reactive.DispatcherHandler       : Processing POST request for [http://localhost:8080/message/4]
2018-05-05 16:51:11.363 DEBUG 6552 --- [ctor-http-nio-3] s.w.r.r.m.a.RequestMappingHandlerMapping : Looking up handler method for path /message/4
2018-05-05 16:51:11.363 DEBUG 6552 --- [ctor-http-nio-3] s.w.r.r.m.a.RequestMappingHandlerMapping : Did not find handler method for [/message/4]
2018-05-05 16:51:11.364 TRACE 6552 --- [ctor-http-nio-3] o.s.w.r.r.method.InvocableHandlerMethod  : Invoking 'org.springframework.integration.webflux.inbound.WebFluxInboundEndpoint.handle' with arguments [org.springframework.web.server.adapter.DefaultServerWebExchange@71f648a3]
2018-05-05 16:51:11.364 TRACE 6552 --- [ctor-http-nio-3] o.s.w.r.r.method.InvocableHandlerMethod  : Method [org.springframework.integration.webflux.inbound.WebFluxInboundEndpoint.handle] returned [MonoDefer]
2018-05-05 16:51:11.364 DEBUG 6552 --- [ctor-http-nio-3] o.s.w.r.r.method.InvocableHandlerMethod  : Response fully handled in controller method

Why is that?

dschulten
  • 2,994
  • 1
  • 27
  • 44
  • While editing I tried to assign a stackoverflow spring-integration-webflux tag, but that doesn't exist yet. Maybe someone with enough reputation can create it. – dschulten May 05 '18 at 10:26
  • How does it work if you do like this: `.habdle((p, h) -> Flux.from(reactiveSource()))` ? – Artem Bilan May 05 '18 at 12:10
  • unfortunately, I get the same result: no log output in the reactiveSource() flow, no output on the /events resource. I have added the trace log which shows that the debug log was misleading: the Webflux inbound endpoint in fact *does* get invoked. – dschulten May 05 '18 at 14:57
  • it appears that processing stops in WebFluxInboundEndpoint when the POST has no body. – dschulten May 05 '18 at 15:10

1 Answers1

2

The reason appears to be that WebFluxInboundEndpoint stops processing POST requests without body in doHandle(), the line

.map(body -> new HttpEntity<>(...)) 

is never executed if there is no request body content:

private Mono<Void> doHandle(ServerWebExchange exchange) {
    return extractRequestBody(exchange)
            .doOnSubscribe(s -> this.activeCount.incrementAndGet())
            .map(body -> new HttpEntity<>(body, exchange.getRequest().getHeaders()))
            .map(entity -> buildMessage(entity, exchange))
            .flatMap(requestMessage -> {
                if (this.expectReply) {
                    return sendAndReceiveMessageReactive(requestMessage)
                            .flatMap(replyMessage -> populateResponse(exchange, replyMessage));
                }
                else {
                    send(requestMessage);
                    return setStatusCode(exchange);
                }
            })
            .doOnTerminate(this.activeCount::decrementAndGet);

}

Workaround: the caller must send any non-empty request body to make it work, e.g. a single quote passed with -d is sufficient:

curl -d ' http://localhost:8080/message/4

With such a request, my log contains the incoming GenericMessage as expected and the /events resource starts producing SSE.

2018-05-05 17:25:24.777 TRACE 40436 --- [ctor-http-nio-8] o.s.w.r.r.method.InvocableHandlerMethod  : Method [org.springframework.integration.webflux.inbound.WebFluxInboundEndpoint.handle] returned [MonoDefer]
2018-05-05 17:25:24.777 DEBUG 40436 --- [ctor-http-nio-8] o.s.w.r.r.method.InvocableHandlerMethod  : Response fully handled in controller method
2018-05-05 17:25:24.778  INFO 40436 --- [ctor-http-nio-8] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=4, headers={http_requestMethod=POST, Accept=*/*, User-Agent=curl/7.49.1, http_requestUrl=http://localhost:8080/message/4, Host=localhost:8080, id=9a09294d-280a-af3b-0894-23597cf1cb5f, Content-Length=1, contentType=application/x-www-form-urlencoded, timestamp=1525533924778}]
dschulten
  • 2,994
  • 1
  • 27
  • 44
  • That’s great catch! You can accept your own answer though. I think you can raise a JIRA on the matter and we will revise what to do there in case of empty body. Your SSE sample is great though. May be you wouldn’t mind to contribute it back to SI Samples? – Artem Bilan May 05 '18 at 16:21
  • Good idea. The example would use a POST with body, though and maybe mention the issue. BTW Can you create a spring-integration-webflux StackOverflow tag (requires 1500 reputation)? – dschulten May 05 '18 at 17:17
  • There is no reason in such a fine-grained tagging - the common `spring-integration` is fully enough. I monitor all of them. Thank you for JIRA and considering contribution! – Artem Bilan May 05 '18 at 17:46