5

Problem is somewhere in between Project Reactor and Reactive MongoDB (Spring Data).

When executing a stream that contains (in the following order):

  1. Method that operates on Reactive MongoDB which is very quick
  2. Method that takes more than 30 seconds

Stream is being cancelled (look at the code and logs below)

@GetMapping("/test/{msg}")
public Mono<SomeObject> test(@PathVariable String msg) {
    return repository.findByMessage(msg).log("1")
          .map(someObj -> delaySeconds(someObj, 35)).log("2");
}

As you can see, after 30 seconds stream is cancelled, but after another 5 seconds (timeout is 35 seconds) there's event onNext executed.

12:59:18.556 [Thread-9] INFO  com.why.temp.TempController - Saved:SomeObject(id=5b604106ef301746a86665f3, message=WHY)
12:59:18.591 [http-nio-8080-exec-2] INFO  1 - | onSubscribe([Fuseable] MonoFlatMap.FlatMapMain)
12:59:18.592 [http-nio-8080-exec-2] INFO  2 - | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
12:59:18.593 [http-nio-8080-exec-2] INFO  2 - | request(unbounded)
12:59:18.593 [http-nio-8080-exec-2] INFO  1 - | request(unbounded)
12:59:18.612 [Thread-8] INFO  1 - | onNext(SomeObject(id=5b604106ef301746a86665f3, message=WHY))
12:59:49.116 [http-nio-8080-exec-3] INFO  2 - | cancel()
12:59:49.117 [http-nio-8080-exec-3] INFO  1 - | cancel()
12:59:53.612 [Thread-8] INFO  2 - | onNext(SomeObject(id=5b604106ef301746a86665f3, message=WHY))

Can you explain me why stream is cancelled and how can I handle this?

Is there any timeout that should be increased or am I using Project Reactor Stream API and MongoDB in a wrong way?

Here is my MongoDB Configuration

@Bean
public ReactiveMongoTemplate reactiveMongoTemplate() {
    ConnectionString str = new ConnectionString(env.getMongoUri());
    return new ReactiveMongoTemplate(MongoClients.create(str), str.getDatabase());
}

Any idea? Please upvote this question if you have a similar problem.

Workaround for this is simple, but not so elegant:

@GetMapping("/test/{msg}")
public Mono<SomeObject> test(@PathVariable String msg) {
    SomeObj someObj = repository.findByMessage(msg).block();
    return Mono.just(someObj).log("1")
        .map(someObj -> delaySeconds(someObj, 35)).log("2");
}
Wojciech Marusarz
  • 342
  • 1
  • 2
  • 10
  • Of the top of my head, I'd say that most likely, not all fields of a msg are indexed. This potentially causes a collection scan, which - depending on the size of the collection - may take a very long time. – Markus W Mahlberg Aug 03 '18 at 06:33
  • In this case, _repository.findByMessage(msg)_ is very quick, but if next step in the stream takes more that 30 seconds (_delaySeconds(someObj, 35) in this case_), all stream is being cancelled. I suppose that some connection expires, which should be closed just after _findByMessage_ operation. – Wojciech Marusarz Aug 03 '18 at 06:49

1 Answers1

3

I have a similar issue, when reactive operations chain took more than magic 30 seconds. In my case it was the Spring MVC request timeout, here is the slution:

Recurring AsyncRequestTimeoutException in Spring Boot Admin log

The dafault value of spring.mvc.async.request-timeout is 30s.

I believe that it will help :).

Cheers!