15

I am using Spring Webflux with Spring data jpa using PostgreSql as backend db. I don't want to block the main thread while making db calls like find and save. To achieve the same, I have a main scheduler in Controller class and a jdbcScheduler service classes.

The way I have defined them is:

@Configuration
@EnableJpaAuditing
public class CommonConfig {

    @Value("${spring.datasource.hikari.maximum-pool-size}")
    int connectionPoolSize;

    @Bean
    public Scheduler scheduler() {
        return Schedulers.parallel();
    }

    @Bean
    public Scheduler jdbcScheduler() {
        return Schedulers.fromExecutor(Executors.newFixedThreadPool(connectionPoolSize));
    }

    @Bean
    public TransactionTemplate transactionTemplate(PlatformTransactionManager transactionManager) {
        return new TransactionTemplate(transactionManager);
    }
}

Now, while doing a get/save call in my service layer I do:

    @Override
    public Mono<Config> getConfigByKey(String key) {
        return Mono.defer(
            () -> Mono.justOrEmpty(configRepository.findByKey(key)))
            .subscribeOn(jdbcScheduler)
            .publishOn(scheduler);
    }

    @Override
    public Flux<Config> getAllConfigsAfterAppVersion(int appVersion) {
        return Flux
            .fromIterable(configRepository.findAllByMinAppVersionIsGreaterThanEqual(appVersion))
            .subscribeOn(jdbcScheduler)
            .publishOn(scheduler);
    }

    @Override
    public Flux<Config> addConfigs(List<Config> configList) {
        return Flux.fromIterable(configRepository.saveAll(configList))
            .subscribeOn(jdbcScheduler)
            .publishOn(scheduler);
    }

And in controller, I do:

    @PostMapping
    @ResponseStatus(HttpStatus.CREATED)
    Mono<ResponseDto<List<Config>>> addConfigs(@Valid @RequestBody List<Config> configs) {
        return configService.addConfigs(configs).collectList()
            .map(configList -> new ResponseDto<>(HttpStatus.CREATED.value(), configList, null))
            .subscribeOn(scheduler);
    }

Is this correct? and/or there is a way better way to do it?

What I understand by:

.subscribeOn(jdbcScheduler)
.publishOn(scheduler);

is that task will run on jdbcScheduler threads and later result will be published on my main parallel scheduler. Is this understanding correct?

abstractKarshit
  • 1,355
  • 2
  • 16
  • 34

2 Answers2

11

Your understanding is correct with regards to publishOn and subscribeOn (see reference documentation in the reactor project about those operators).

If you call blocking libraries without scheduling that work on a specific scheduler, those calls will block one of the few threads available (by default, the Netty event loop) and your application will only be able to serve a few requests concurrently.

Now I'm not sure what you're trying to achieve by doing that.

First, the parallel scheduler is designed for CPU bound tasks, meaning you'll have few of them, as many (or a bit more) as CPU cores. In this case, it's like setting your threadpool size to the number of cores on a regular Servlet container. Your app won't be able to process a large number of concurrent requests.

Even if you choose a better alternative (like the elastic Scheduler), it will be still not as good as the Netty event loop, which is where request processing is scheduled natively in Spring WebFlux.

If your ultimate goal is performance and scalability, wrapping blocking calls in a reactive app is likely to perform worse than your regular Servlet container.

You could instead use Spring MVC and:

  • use usual blocking return types when you're dealing with a blocking library, like JPA
  • use Mono and Flux return types when you're not tied to such libraries

This won't be non-blocking, but this will be asynchronous still and you'll be able to do more work in parallel without dealing with the complexity.

Brian Clozel
  • 56,583
  • 15
  • 167
  • 176
  • 1
    Hi Brian , thanks for a quick response and precise explanation. What I assumed was that all my main execution will happen on main scheduler (which is parallel in my case) and jdbc tasks will be offloaded to another jdbcScheduler without blocking my main thread for the response of the call. What I understand from your answer is that: • I should not use a subscribe on Controller layer and rather leave it to Netty event loop. The idea to do something like this came from here: https://dzone.com/articles/spring-5-webflux-and-jdbc-to-block-or-not-to-block – abstractKarshit Feb 25 '19 at 11:02
  • 1
    Also, if possible can someone tell me what happens if I don't add a subscribeOn anywhere in my Client -> Controller -> Service -> Repository -> Controller -> Client call.(here, client is external) It seems to be working fine in my local, what will be the implications of this on a high throughput webserver? – abstractKarshit Feb 25 '19 at 11:16
  • 2
    if you leave out the `publishOn` then things will run on the scheduler you've configured in subscribeon. So no, things won't happen on the netty loop anymore. M advice is to use Spring MVC because it supports reactive return types. It is likely to perform better than WebFlux wrapping blocking calls – Brian Clozel Feb 25 '19 at 11:18
  • 7
    I utterly disagree with ***If your ultimate goal is performance and scalability, wrapping blocking calls in a reactive app is likely to perform worse than your regular Servlet container*** . Wrapping blocking class in reactive app is still better than thread based Servlet container because you only use the thread based styled (thread pool or Servlet container) on the blocking calls instead of everything. – InformedA Sep 15 '20 at 19:17
0

IMHO, there a way to execute this operation doing a better use of resources from machine. Following documentation you can wrap the call in other Thread and with this you can continue your execution.

Ivan Rodrigues
  • 441
  • 4
  • 20