Problem is somewhere in between Project Reactor and Reactive MongoDB (Spring Data).
When executing a stream that contains (in the following order):
- Method that operates on Reactive MongoDB which is very quick
- Method that takes more than 30 seconds
Stream is being cancelled (look at the code and logs below)
@GetMapping("/test/{msg}")
public Mono<SomeObject> test(@PathVariable String msg) {
return repository.findByMessage(msg).log("1")
.map(someObj -> delaySeconds(someObj, 35)).log("2");
}
As you can see, after 30 seconds stream is cancelled, but after another 5 seconds (timeout is 35 seconds) there's event onNext executed.
12:59:18.556 [Thread-9] INFO com.why.temp.TempController - Saved:SomeObject(id=5b604106ef301746a86665f3, message=WHY)
12:59:18.591 [http-nio-8080-exec-2] INFO 1 - | onSubscribe([Fuseable] MonoFlatMap.FlatMapMain)
12:59:18.592 [http-nio-8080-exec-2] INFO 2 - | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
12:59:18.593 [http-nio-8080-exec-2] INFO 2 - | request(unbounded)
12:59:18.593 [http-nio-8080-exec-2] INFO 1 - | request(unbounded)
12:59:18.612 [Thread-8] INFO 1 - | onNext(SomeObject(id=5b604106ef301746a86665f3, message=WHY))
12:59:49.116 [http-nio-8080-exec-3] INFO 2 - | cancel()
12:59:49.117 [http-nio-8080-exec-3] INFO 1 - | cancel()
12:59:53.612 [Thread-8] INFO 2 - | onNext(SomeObject(id=5b604106ef301746a86665f3, message=WHY))
Can you explain me why stream is cancelled and how can I handle this?
Is there any timeout that should be increased or am I using Project Reactor Stream API and MongoDB in a wrong way?
Here is my MongoDB Configuration
@Bean
public ReactiveMongoTemplate reactiveMongoTemplate() {
ConnectionString str = new ConnectionString(env.getMongoUri());
return new ReactiveMongoTemplate(MongoClients.create(str), str.getDatabase());
}
Any idea? Please upvote this question if you have a similar problem.
Workaround for this is simple, but not so elegant:
@GetMapping("/test/{msg}")
public Mono<SomeObject> test(@PathVariable String msg) {
SomeObj someObj = repository.findByMessage(msg).block();
return Mono.just(someObj).log("1")
.map(someObj -> delaySeconds(someObj, 35)).log("2");
}