0

I'm trying to have a working spring boot with reactive mongodb and EventSource. However, I'm facing issues with the repetitive reopening of the connection because it's closed by the server. I even have some doubt if this could really work since I didn't find any working example with a reactive db and Event source...

Could you please point me to a working example or tell me what's wrong with my code?

Here the main parts of the code:

pom.xml

<properties>
  <java.version>1.8</java.version>
  <junit-jupiter.version>5.3.2</junit-jupiter.version>
</properties>

<parent>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-parent</artifactId>
  <version>2.3.5.RELEASE</version>
</parent>

<dependencies>

<!-- webflux reactive -->
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

<!-- thymeleaf -->
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-thymeleaf</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-tomcat</artifactId>
</dependency>

<!-- exclude junit 4, prefer junit 5 -->
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-test</artifactId>
  <scope>test</scope>
  <exclusions>
    <exclusion>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
    </exclusion>
  </exclusions>
</dependency>

<!-- junit 5 -->
<dependency>
  <groupId>org.junit.jupiter</groupId>
  <artifactId>junit-jupiter-engine</artifactId>
  <version>${junit-jupiter.version}</version>
  <scope>test</scope>
</dependency>

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-devtools</artifactId>
  <optional>true</optional>
</dependency>

<dependency>
  <groupId>org.projectlombok</groupId>
  <artifactId>lombok</artifactId>
</dependency>

</dependencies>

As you see in the pom, I'm using the embedded tomcat (I already tried with Netty, the default spring boot server...). Also, I'm deploying the app to any remote server but just trying on my local (windows 10).

Web:

    let source = new EventSource("/comment/stream");

    source.addEventListener("message", function (event) {
        // These events are JSON, so parsing and DOM fiddling are needed
        var comment = JSON.parse(event.data);
        console.log(comment ); 
    });

    source.addEventListener("error", function (event) {
      console.log("error", event);
      this.close();
    });

RestController:

@RestController
public class CommentController {

  @Autowired
  private CommentRepository commentRepository;

  @PostMapping(path = "/comment")
  public Mono<Comment> comment(@RequestBody Comment comment) {
    return this.commentRepository.save(comment);
  }

  @GetMapping(path = "/comment/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
  public Flux<Comment> feed() {
    return this.commentRepository.findAll();
  }

}

DB Repository:

@Repository
public interface CommentRepository extends ReactiveSortingRepository<Comment, String> {

 Flux<Comment> findAll();
}

Again, the web client that uses EventSource, keeps reconnecting every second because the connection is closed by the server.

Thank you!

1 Answers1

0

Im not really sure, you are giving us too little information as to why your connection is closing. No logs, and you are not disclosing anything about where it being deployed.

i will only answer this question based on personal experience. I deployed an application to heroku that uses event streams and they have a proxy/loadbalancer infront of every application that will kill any connection that does not send anything after up to 60 sec.

As mentioned here Why are event sources closed after 30-60 sec it confirmes what i have been noticing.

To work around this you can if using websockets implement ping/pong messages or if using ServerSentEvents as i did, i implemented keep alive messages.

    .GET("", accept(TEXT_EVENT_STREAM), request -> ok()
        .contentType(TEXT_EVENT_STREAM)
        .header("Cache-Control", "no-transform")
        .body(Flux.merge(myHandler.getEvents()), 
                  Flux.interval(Duration.ofSeconds(15))
                          .map(aLong -> ServerSentEvent.builder()
                                                       .comment("keep alive")
                                                       .build())),
        new ParameterizedTypeReference<List<MyClass>>() {}))

I have taken this code snippet from one of my projects. Here you can see that i merge with my current stream a flux that at given intervals (15 sec) will emit a ServerSentEvent with only a keep alive comment. Since it is a comment it will get ignored by the client.

Just need to mention, the regular stream myHandler.getEvents returns data wrapped in ServerSentEvents.

Toerktumlare
  • 12,548
  • 3
  • 35
  • 54