1

I am working on a web application where the user's connection times out after a specific time (say 20 seconds). For long running requests I have to return a default message ("your request is under process") and then send an email to the user with the actual result.

I couldn't do this with spring web because I didn't know how to specify a timeout in the controller (with customized messages per request) and at the same time let other requests come through and be processed too. That's why I used spring web-flux which has a timeout operator for both Mono and Flux types.

To make the requested process run in a different thread, I have used Sinks. One to receive requests and one to publish the results. My problem is that the response sink can only return one result and subsequent calls to the URL returns an empty response. For example the first call to /reactive/getUser/123456789 returns the user object but subsequent calls return empty.

I'm not sure if the problem is with the Sink I have used or with how I am getting data from it. In the sample code I have used responseSink.asFlux().next() but I have also tried .single(), .toMono(), .take(1). to no avail. I get the same result.

@RequestMapping("/reactive")
@RestController
class SampleController @Autowired constructor(private val externalService: ExternalService) {
  private val requestSink = Sinks.many().multicast().onBackpressureBuffer<String>()
  private val responseSink = Sinks.many().multicast().onBackpressureBuffer<AppUser>()

  init {
    requestSink.asFlux()
      .map { phoneNumber -> externalService.findByIdOrNull(phoneNumber) }
      .doOnNext {
        if (it != null) {
          responseSink.tryEmitNext(it)
        } else {
          responseSink.tryEmitError(Throwable("didn't find a value for that phone number"))
        }
      }
      .subscribe()
  }

  @GetMapping("/getUser/{phoneNumber}")
  fun getUser(@PathVariable phoneNumber: String): Mono<String> {
    requestSink.tryEmitNext(phoneNumber)
    return responseSink.asFlux()
      .next()
      .map { it.toString() }
      .timeout(Duration.ofSeconds(20), Mono.just("processing your request"))
  }
}
hamid
  • 1,828
  • 1
  • 13
  • 17
  • All connections will time out after a while, otherwise you risk having memory leaks. If you want two way flowing communication, use websockets. If you want one way, as in the client subscribing and getting information, use server side events and send ”keep alive” messages/comments. Also null checks are redundant in reactor, as null is forbidden to be returned downstream. – Toerktumlare Sep 21 '21 at 07:59
  • I'm not sure how that answers the question. Why does the controller method only return results the first time and not on subsequent calls? – hamid Sep 21 '21 at 08:10
  • your code is quite strange `requestSink.tryEmitNext(phoneNumber)` does not do anything, you are placing something into this sink, but no client is subscribing to it. The stuff in your init function is just run once, and you should not be subscribing in your own application. – Toerktumlare Sep 21 '21 at 10:15
  • Please run the code first and then call it strange. I've tested and done logging. It works. The only problem is the controller only responds to the first request. – hamid Sep 21 '21 at 10:30
  • clearly it doesn´t work, cause if it did you would not be asking on stack overflow. Second of all, just because something "half works" it can still be strange. Read my comment, if you dont understand my comment, then start googling. I will repeat what i wrote. You can put stuff into the requestSink all day long, no one is subscribing to it, so nothing will happen. You clearly have missunderstood the entire use of sinks and i suggest you read the chapter on sinks in the reactive documentation. – Toerktumlare Sep 21 '21 at 11:53
  • Alright. If I haven't subscribed to the sink why do I get a response from calling the url? My question was why do I only get the response the first time and not on subsequent calls. The return type in Mono. That's how subscription happens in webflux. I don't need to explicitly call subscribe. – hamid Sep 23 '21 at 00:59
  • Look at the flow, when class is created, init is run and since you are subscribing here it will run the code fetch a phonenumber and place it in the buffer of the responseSink. The you make your request, and what is returned is the responseSink where you have one phonenumber. I have no idea why you have a request AND a response sink, you only need one. And the one you use must be returned to the client. You dont really seem to understand how sinks are to be used, why im telling that you should go through the documentation. At SO we dont teach the basics, thats what docs are for. – Toerktumlare Sep 23 '21 at 06:27
  • sinks are a more complicated topic so you should be well versed with reactor before you start using them. To understand the basics you should read up on how to create a sequence https://projectreactor.io/docs/core/release/reference/#producing and then you can read about processors and sinks https://projectreactor.io/docs/core/release/reference/#processors – Toerktumlare Sep 23 '21 at 07:51
  • Honestly, you haven't told me much but you have certainly taught me how to increase my score in SO. Pretend to know something and then upvote your own comments. I've used sinks to run the code in different threads. I dare you to fix the code for a bounty. – hamid Sep 23 '21 at 11:14
  • @Neil Swingler explains the rootcause in this thread : https://stackoverflow.com/questions/66671636/why-is-sinks-many-multicast-onbackpressurebuffer-completing-after-one-of-t – Leonar T Apr 30 '23 at 17:18

0 Answers0