31

Spring 5 introduces the reactive programming style for rest APIs with webflux. I'm fairly new to it myself and was wondering wether wrapping synchronous calls to a database into Flux or Mono makes sense preformence-wise? If yes, is this the way to do it:

@RestController
public class HomeController {

    private MeasurementRepository repository;

    public HomeController(MeasurementRepository repository){
        this.repository = repository;
    }

    @GetMapping(value = "/v1/measurements")
    public Flux<Measurement> getMeasurements() {
        return Flux.fromIterable(repository.findByFromDateGreaterThanEqual(new Date(1486980000L)));
    }

}

Is there something like an asynchronous CrudRepository? I couldn't find it.

codependent
  • 23,193
  • 31
  • 166
  • 308
Lukasz
  • 2,257
  • 3
  • 26
  • 44
  • JDBC code is inherently synchronous there aren't any reactive JDBC drivers out there (and I doubt there ever will be). So accessing a database like this doesn't really make sense. – M. Deinum Feb 17 '17 at 13:48
  • I am not familiar with flux, but I know you can use Java 8 Stream as the return type in Spring Data JPA. You can return `Stream`. Not sure if this comment helps or not though :) – burcakulug May 04 '17 at 00:34
  • It's a good start, but it's not asynchronous so the caller blocks whilst the JDBC operation is in progress which breaks the webflux non-blocking paradigm. – NeilS Jun 07 '17 at 19:45

4 Answers4

30

One option would be to use alternative SQL clients that are fully non-blocking. Some examples include: https://github.com/mauricio/postgresql-async or https://github.com/finagle/roc. Of course, none of these drivers is officially supported by database vendors yet. Also, functionality is way much less attractive comparing to mature JDBC-based abstractions such as Hibernate or jOOQ.

The alternative idea came to me from Scala world. The idea is to dispatch blocking calls into isolated ThreadPool not to mix blocking and non-blocking calls together. This will allow us to control the overall number of threads and will let the CPU serve non-blocking tasks in the main execution context with some potential optimizations. Assuming that we have JDBC based implementation such as Spring Data JPA which is indeed blocking, we can make it’s execution asynchronous and dispatch on the dedicated thread pool.

@RestController
public class HomeController {

    private final MeasurementRepository repository;
    private final Scheduler scheduler;

    public HomeController(MeasurementRepository repository, @Qualifier("jdbcScheduler") Scheduler scheduler) {
        this.repository = repository;
        this.scheduler = scheduler;
    }

    @GetMapping(value = "/v1/measurements")
    public Flux<Measurement> getMeasurements() {
        return Mono.fromCallable(() -> repository.findByFromDateGreaterThanEqual(new Date(1486980000L))).publishOn(scheduler);
    }

}

Our Scheduler for JDBC should be configured by using dedicated Thread Pool with size count equal to the number of connections.

@Configuration
public class SchedulerConfiguration {
    private final Integer connectionPoolSize;

    public SchedulerConfiguration(@Value("${spring.datasource.maximum-pool-size}") Integer connectionPoolSize) {
        this.connectionPoolSize = connectionPoolSize;
    }

    @Bean
    public Scheduler jdbcScheduler() {
        return Schedulers.fromExecutor(Executors.newFixedThreadPool(connectionPoolSize));
    }

}

However, there are difficulties with this approach. The main one is transaction management. In JDBC, transactions are possible only within a single java.sql.Connection. To make several operations in one transaction, they have to share a connection. If we want to make some calculations in between them, we have to keep the connection. This is not very effective, as we keep a limited number of connections idle while doing calculations in between.

This idea of an asynchronous JDBC wrapper is not new and is already implemented in Scala library Slick 3. Finally, non-blocking JDBC may come along on the Java roadmap. As it was announced at JavaOne in September 2016, and it is possible that we will see it in Java 10.

Grygoriy Gonchar
  • 3,898
  • 1
  • 24
  • 16
  • Is there a recommended option or WIP project that implements the CrudRepository? It's a shame to say that SQL projects can't use reactor/WebFlux until Java 10 at the earliest. – NeilS Jun 07 '17 at 19:43
  • @NeilS from multiple sources I can see that this work is not planned yet, at least by Pivotal: https://jira.spring.io/browse/DATAJPA-701 https://spring.io/blog/2017/02/23/spring-framework-5-0-m5-update#comment-3174616521 – Grygoriy Gonchar Jun 07 '17 at 23:12
  • 5
    The publishOn(scheduler) call looks weird to me in the code example. Shoud you us instead subscribeOn which run the request() to this publisher on a the given scheduler ? – etiennepeiniau Sep 14 '17 at 20:25
  • 5
    It should instead be subscribeOn() method using a thread pool not publishOn() as the code above has. – ROCKY Mar 14 '18 at 18:28
  • what about insert? do we need commit transaction ourselves? because I suspect that @Transaction will not work for such case, because we quit from method before performing work in thread. – Simon May 03 '18 at 11:24
10

Based on this blog you should rewrite your snippet in following way

@GetMapping(value = "/v1/measurements")
public Flux<Measurement> getMeasurements() {
    return Flux.defer(() -> Flux.fromIterable(repository.findByFromDateGreaterThanEqual(new Date(1486980000L))))
           .subscribeOn(Schedulers.elastic());
}
Dmytro Boichenko
  • 5,217
  • 3
  • 28
  • 31
7

Obtaining a Flux or a Mono doesn’t necessarily mean it will run in a dedicated Thread. Instead, most operators continue working in the Thread on which the previous operator executed. Unless specified, the topmost operator (the source) itself runs on the Thread in which the subscribe() call was made.

If you have blocking persistence APIs (JPA, JDBC) or networking APIs to use, Spring MVC is the best choice for common architectures at least. It is technically feasible with both Reactor and RxJava to perform blocking calls on a separate thread but you would not be making the most of a non-blocking web stack.

So... How do I wrap a synchronous, blocking call?

Use Callable to defer execution. And you should use Schedulers.elastic because it creates a dedicated thread to wait for the blocking resource without tying up some other resource.

  • Schedulers.immediate() : Current thread.
  • Schedulers.single() : A single, reusable thread.
  • Schedulers.newSingle() : A per-call dedicated thread.
  • Schedulers.elastic() : An elastic thread pool. It creates new worker pools as needed, and reuse idle ones. This is a good choice for I/O blocking work for instance.
  • Schedulers.parallel() : A fixed pool of workers that is tuned for parallel work.

example:

Mono.fromCallable(() -> blockingRepository.save())
        .subscribeOn(Schedulers.elastic());
kkd927
  • 317
  • 4
  • 11
5

Spring data support reactive repository interface for Mongo and Cassandra.

Spring data MongoDb Reactive Interface

Spring Data MongoDB provides reactive repository support with Project Reactor and RxJava 1 reactive types. The reactive API supports reactive type conversion between reactive types.

public interface ReactivePersonRepository extends ReactiveCrudRepository<Person, String> {

    Flux<Person> findByLastname(String lastname);

    @Query("{ 'firstname': ?0, 'lastname': ?1}")
    Mono<Person> findByFirstnameAndLastname(String firstname, String lastname);

    // Accept parameter inside a reactive type for deferred execution
    Flux<Person> findByLastname(Mono<String> lastname);

    Mono<Person> findByFirstnameAndLastname(Mono<String> firstname, String lastname);

    @InfiniteStream // Use a tailable cursor
    Flux<Person> findWithTailableCursorBy();

}

public interface RxJava1PersonRepository extends RxJava1CrudRepository<Person, String> {

    Observable<Person> findByLastname(String lastname);

    @Query("{ 'firstname': ?0, 'lastname': ?1}")
    Single<Person> findByFirstnameAndLastname(String firstname, String lastname);

    // Accept parameter inside a reactive type for deferred execution
    Observable<Person> findByLastname(Single<String> lastname);

    Single<Person> findByFirstnameAndLastname(Single<String> firstname, String lastname);

    @InfiniteStream // Use a tailable cursor
    Observable<Person> findWithTailableCursorBy();
}
Juan Carlos Mendoza
  • 5,736
  • 7
  • 25
  • 50
yousafsajjad
  • 973
  • 2
  • 18
  • 34
  • In the original question, the database result needs to be returned back as response to the RESTful call. In this scenario , is there really a use in using Flux R2DBC ? It has to be blocking anyways – lives Jul 16 '21 at 20:01