2

I have a question very similar to this one How to create a Spring Reactor Flux from a ActiveMQ queue?

With one difference that messages come from Http endpoint rather than JMS queue. The problem is that Message Channel is not get populated for some reason or it is not picked up by Flux.from(). The log entries show that GenericMessage is created from Http Integration flow with a payload as path variable but does not get enqueued/published to a channel? I tried .channel(MessageChannels.queue()) and .channel(MessageChannels.publishSubscribe()) does not make any difference, event stream is empty. Here is the code:

@Bean
public Publisher<Message<String>> httpReactiveSource() {
        return IntegrationFlows.
                from(Http.inboundChannelAdapter("/eventmessage/{id}")
                        .requestMapping(r -> r
                        .methods(HttpMethod.POST)                                                                                   
                        )
                        .payloadExpression("#pathVariables.id")
                        )                           
                        .channel(MessageChannels.queue())
                        .log(LoggingHandler.Level.DEBUG)                            
                        .log()  
                        .toReactivePublisher();
    }


@GetMapping(value="eventmessagechannel/{id}", produces=MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> eventMessages(@PathVariable String id){     
    return Flux.from(httpReactiveSource())              
            .map(Message::getPayload);

}

UPDATE1:

build.gradle

buildscript {
    ext {
        springBootVersion = '2.0.0.M2'
    }
    repositories {
        mavenCentral()
        maven { url "https://repo.spring.io/snapshot" }
        maven { url "https://repo.spring.io/milestone" }
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
    }
}

apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'

version = '0.0.1-SNAPSHOT'
sourceCompatibility = 1.8

repositories {
    mavenCentral()
    maven { url "https://repo.spring.io/snapshot" }
    maven { url "https://repo.spring.io/milestone" }
}


dependencies {
    compile('org.springframework.boot:spring-boot-starter-freemarker')
    compile('org.springframework.boot:spring-boot-starter-integration') 
    compile('org.springframework.boot:spring-boot-starter-web')
    compile('org.springframework.boot:spring-boot-starter-webflux') 
    compile('org.springframework.integration:spring-integration-http')
    testCompile('org.springframework.boot:spring-boot-starter-test')
    testCompile('io.projectreactor:reactor-test')

}

UPDATE2

It works when @SpringBootApplication and @RestController are defined in one file, but stops to work when @SpringBootApplication and @RestController are in separate files.

TestApp.java

package com.example;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;


@SpringBootApplication
public class TestApp {
     public static void main(String[] args) {
            SpringApplication.run(TestApp.class, args);
    }
}

TestController.java

package com.example.controller;


import org.springframework.context.annotation.Bean;
import org.reactivestreams.Publisher;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.channel.MessageChannels;
import org.springframework.integration.handler.LoggingHandler;
import org.springframework.integration.http.dsl.Http;
import org.springframework.messaging.Message;
import org.springframework.web.bind.annotation.RestController;



import org.springframework.web.bind.annotation.GetMapping;
import reactor.core.publisher.Flux;



@RestController
public class TestController {
     @Bean
        public Publisher<Message<String>> httpReactiveSource() {
            return IntegrationFlows.
                    from(Http.inboundChannelAdapter("/message/{id}")
                            .requestMapping(r -> r
                                    .methods(HttpMethod.POST)
                            )
                            .payloadExpression("#pathVariables.id")
                    )
                    .channel(MessageChannels.queue())
                    .toReactivePublisher();
        }

        @GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
        public Flux<String> eventMessages() {
            return Flux.from(httpReactiveSource())
                    .map(Message::getPayload);
        }

}
Mike
  • 33
  • 1
  • 5
  • What Spring Boot configuration (`pom`) do you use? How do you mix Servlet container with the WebFlux ? – Artem Bilan Jul 13 '17 at 19:40
  • Hi Artem, thank you for looking into it. I am trying it with all defaults of `SpringBootApplication v. '2.0.0.M2'`. I have a `@RestController` with above mentioned two methods. And it is basically all the same (config-wise) like in your answer in the ActiveMQ topic which works for me. But instead of JMS queue I receive message from http REST call. – Mike Jul 14 '17 at 07:58
  • I have just updated the question with gradle dependencies being used. Thanks! – Mike Jul 14 '17 at 08:17
  • Ok. Thanks for the update. I see that you have both `web` and `webflux` dependencies. How does it work just with `web`? I can't check locally because Boot is broken a bit now – Artem Bilan Jul 14 '17 at 11:52
  • Thank you for getting back. Ok, created a new quick project and removed dependency on `spring-boot-starter-webflux` in it. But looks like it did not help. Still, no events are being sent. – Mike Jul 14 '17 at 13:07

1 Answers1

1

This works for me well:

@SpringBootApplication
@RestController
public class SpringIntegrationSseDemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringIntegrationSseDemoApplication.class, args);
    }

    @Bean
    public Publisher<Message<String>> httpReactiveSource() {
        return IntegrationFlows.
                from(Http.inboundChannelAdapter("/message/{id}")
                        .requestMapping(r -> r
                                .methods(HttpMethod.POST)
                        )
                        .payloadExpression("#pathVariables.id")
                )
                .channel(MessageChannels.queue())
                .toReactivePublisher();
    }

    @GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<String> eventMessages() {
        return Flux.from(httpReactiveSource())
                .map(Message::getPayload);
    }

}

I have this dependencies in POM:

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.0.0.BUILD-SNAPSHOT</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-integration</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-http</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-maven-plugin</artifactId>
        </plugin>
    </plugins>
</build>

I run the app and have two terminals:

curl http://localhost:8080/events

to listen to SSEs.

And in the second one I perform this:

curl -X POST http://localhost:8080/message/foo

curl -X POST http://localhost:8080/message/bar

curl -X POST http://localhost:8080/message/666

So, the first terminal responds like:

data:foo

data:bar

data:666

Note, we don't need spring-boot-starter-webflux dependency. The Flux to SSE works well with regular MVC on the Servlet Container.

Spring Integration will support WebFlux soon, too: https://jira.spring.io/browse/INT-4300. So, you will be able to configure there something like:

   IntegrationFlows
    .from(Http.inboundReactiveGateway("/sse")
                        .requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE)))

And fully rely just only WebFlux without any Servlet Container dependencies.

Artem Bilan
  • 113,505
  • 11
  • 91
  • 118
  • Hi Artem, thank you for your efforts on it again as well as explanations. Your code works and it has helped to narrow it down to the fact that for some reason of being `@SpringBootApplication` and `@RestController` defined in one file makes it working but when they are in separate files (which is actually sounds more like a real-life case) it does not. – Mike Jul 15 '17 at 09:47
  • Look, the problem is with a missed `@Configuration` on that your `TestController`. Right, the `httpReactiveSource()` `@Bean` is processed and registered properly thanks to the "light configuration" mechanism. But since you call `httpReactiveSource()` method directly from the `eventMessages()` method, there is no proxy call and the `httpReactiveSource()` doesn't delegate to the bean factory for the proper bean resolution. So, really consider to move `httpReactiveSource()` definition to the `@Configuration` class and use `@Autowired` in the `TestController` for the `Publisher>`. – Artem Bilan Jul 15 '17 at 18:36
  • That was it, Artem! It works now when defined via `@Configuration`. We have got to the bottom of the problem finally! Thanks! You guys are doing an awesome job on this framework! – Mike Jul 16 '17 at 08:00