0

I'm investigating how Spring Reactor flow works and have some scenario which can't handle. I want to perform async action when Mono is completed and ready to return result, so I don't block request thread anymore. I don't care about exeption or result in that async action, that's consumer.

In example below I want image procesing to happen in async way after result is returned:

log.debug("Creating image for quote: [{}]", quoteId);
return Mono.fromFuture(quoteRepository.findById(quoteId)
                       .flatMap(original -> {
                           log.debug("Saving image meta for quote: [{}]", quoteId);
                           original.setImageUrl(imageMeta.getImageUrl());
                           return quoteRepository.save(original);
                       })
                       .toFuture()
                       .whenCompleteAsync((quote, ex) -> {
                           log.debug("Processing image meta for quote: [{}]", quoteId);
                           imageService.processImage(quote.getImageUrl());
                       }, taskExecutor))
            .doOnSuccess(quote -> log.debug("Image meta is saved: [{}]", quote.getId()));

But from log statements I gotresult isn't returned until image processing is finished.

23:32:04.974 DEBUG [reactor-http-nio-2] Creating image for quote: [32]
23:32:05.319 DEBUG [reactor-tcp-nio-1] Saving image meta for quote: [32]
23:32:05.340 DEBUG [task-executor-thread1] Processing image meta for quote: [32]
23:32:07.337 DEBUG [task-executor-thread1] Image meta is saved for quote [32]

I also heard something about using schedulers, but this hasn't helped, maybe I'm using incorrectly that:

return quoteRepository.findById(quoteId)
                       .flatMap(original -> {
                           log.debug("Saving image meta for quote: [{}]", quoteId);
                           original.setImageUrl(imageMeta.getImageUrl());
                           return quoteRepository.save(original);
                       })
                       .subscribeOn(Schedulers.elastic())
                       .doOnNext(quote -> {
                           log.debug("Processing image meta for quote: [{}]", quoteId);
                           imageService.processImage(quote.getImageUrl());
                       })
                       .doOnSuccess(quote -> log.debug("Image meta is saved: [{}]", quote.getId()));

Result still isn't returned before image is processed.

23:32:04.974 DEBUG [reactor-http-nio-2] Creating image for quote: [32]
23:32:05.319 DEBUG [reactor-tcp-nio-1] Saving image meta for quote: [32]
23:32:05.340 DEBUG [reactor-tcp-nio-1] Processing image meta for quote: [32]
23:32:07.337 DEBUG [reactor-tcp-nio-1] Image meta is saved for quote [32]

Spring Boot: 2.3.5

Reactor core: 3.3.11

NickEm
  • 41
  • 3

1 Answers1

0

I've accomplished what I wanted to do in a way:

return quoteRepository.findById(quoteId)
                              .flatMap(original -> {
                                  log.debug("Saving image meta for quote: [{}]", quoteId);
                                  original.setImageUrl(imageMeta.getImageUrl());
                                  return quoteRepository.save(original);
                              })
                              .doOnSuccess(quote -> 
                                                   taskExecutor.execute(() -> { 
                                                       log.debug("Processing image meta for quote: [{}]", quoteId);
                                                       imageService.processImage(quote.getImageUrl()); 
                                                   })
                              )
                              .doOnSuccess(quote -> log.debug("Image meta is saved: [{}]", quote.getId()));

But don't we have something native to Spring Flux/Reactor to do the thing equal for taskExecutor?

NickEm
  • 41
  • 3