6

I am playing with R2DBC using Postgre SQL. The usecase i am trying is to get the Film by ID along with Language, Actors and Category. Below is the schema

enter image description here

this is the corresponding piece of code in ServiceImpl

@Override
public Mono<FilmModel> getById(Long id) { 
    Mono<Film> filmMono = filmRepository.findById(id).switchIfEmpty(Mono.error(DataFormatException::new)).subscribeOn(Schedulers.boundedElastic());
    Flux<Actor> actorFlux = filmMono.flatMapMany(this::getByActorId).subscribeOn(Schedulers.boundedElastic());
    Mono<String> language = filmMono.flatMap(film -> languageRepository.findById(film.getLanguageId())).map(Language::getName).subscribeOn(Schedulers.boundedElastic());
    Mono<String> category = filmMono.flatMap(film -> filmCategoryRepository
                    .findFirstByFilmId(film.getFilmId()))
            .flatMap(filmCategory -> categoryRepository.findById(filmCategory.getCategoryId()))
            .map(Category::getName).subscribeOn(Schedulers.boundedElastic());

    return Mono.zip(filmMono, actorFlux.collectList(), language, category)
            .map(tuple -> {
                FilmModel filmModel = GenericMapper.INSTANCE.filmToFilmModel(tuple.getT1());
                List<ActorModel> actors = tuple
                        .getT2()
                        .stream()
                        .map(act -> GenericMapper.INSTANCE.actorToActorModel(act))
                        .collect(Collectors.toList());
                filmModel.setActorModelList(actors);
                filmModel.setLanguage(tuple.getT3());
                filmModel.setCategory(tuple.getT4());
                return filmModel;
            });
         }

The logs show 4 calls to film

2021-12-16 21:21:20.026 DEBUG 32493 --- [ctor-tcp-nio-10] o.s.r2dbc.core.DefaultDatabaseClient     : Executing SQL statement [SELECT film.* FROM film WHERE film.film_id = $1 LIMIT 2]
2021-12-16 21:21:20.026 DEBUG 32493 --- [actor-tcp-nio-9] o.s.r2dbc.core.DefaultDatabaseClient     : Executing SQL statement [SELECT film.* FROM film WHERE film.film_id = $1 LIMIT 2]
2021-12-16 21:21:20.026 DEBUG 32493 --- [ctor-tcp-nio-12] o.s.r2dbc.core.DefaultDatabaseClient     : Executing SQL statement [SELECT film.* FROM film WHERE film.film_id = $1 LIMIT 2]
2021-12-16 21:21:20.026 DEBUG 32493 --- [actor-tcp-nio-7] o.s.r2dbc.core.DefaultDatabaseClient     : Executing SQL statement [SELECT film.* FROM film WHERE film.film_id = $1 LIMIT 2]
2021-12-16 21:21:20.162 DEBUG 32493 --- [actor-tcp-nio-9] o.s.r2dbc.core.DefaultDatabaseClient     : Executing SQL statement [SELECT language.* FROM language WHERE language.language_id = $1 LIMIT 2]
2021-12-16 21:21:20.188 DEBUG 32493 --- [actor-tcp-nio-7] o.s.r2dbc.core.DefaultDatabaseClient     : Executing SQL statement [SELECT film_actor.actor_id, film_actor.film_id, film_actor.last_update FROM film_actor WHERE film_actor.film_id = $1]
2021-12-16 21:21:20.188 DEBUG 32493 --- [ctor-tcp-nio-10] o.s.r2dbc.core.DefaultDatabaseClient     : Executing SQL statement [SELECT film_category.film_id, film_category.category_id, film_category.last_update FROM film_category WHERE film_category.film_id = $1 LIMIT 1]
2021-12-16 21:21:20.313 DEBUG 32493 --- [ctor-tcp-nio-10] o.s.r2dbc.core.DefaultDatabaseClient     : Executing SQL statement [SELECT category.* FROM category WHERE category.category_id = $1 LIMIT 2]
2021-12-16 21:21:20.563 DEBUG 32493 --- [actor-tcp-nio-7] o.s.r2dbc.core.DefaultDatabaseClient     : Executing SQL statement [SELECT actor.* FROM actor WHERE actor.actor_id = $1 LIMIT 2]

I am not trying to look for SQL optimizations(joins etc).I can definitely make it more performant. But the question in point is why i do see 4 SQL queries to Film table. Just to add i have already fixed the code. But not able to understand the core reason.Thanks in advance.

lkatiforis
  • 5,703
  • 2
  • 16
  • 35
Harry
  • 528
  • 1
  • 5
  • 21
  • This is just a general comment, `subscribeOn` will place the entire subscription on the defined scheduler. Which means you only need one. Having multiple will not make any differens, reactor will during the assembly phase locate the first it finds and use that. This is well covered in the reactor documentation. So if you insist on using it, just have one and remove the rest. – Toerktumlare Dec 17 '21 at 08:23
  • The subscribeOn was to run the fetch parallel in its own thread. Each will run in its own thread rather than running in single thread. – Harry Dec 17 '21 at 16:46
  • please read what i just explained. First of all, none of the stuff you have will be run in `parallel` To run things in parallel you need a `ParallelFlux` the code you have now is per default running `async` but you added `onSubscribe` so its actually run on a single thread that it gets assigned when someone subscribes. Please read the documentation https://projectreactor.io/docs/core/release/reference/#schedulers you dont need them. Remove them. Dont use a parallel flux, things wont go faster https://stackoverflow.com/questions/68972035/run-mono-in-parallel-doesnt-seems-faster . Use defaults – Toerktumlare Dec 17 '21 at 18:08
  • @Toerktumlare : thanks for the reply. After your comment i actually removed the code for onSubscribe and tried( along with combinations of defer as suggested by Sam Hughes and without it ). What i saw was that the code is getting executed on different threads( you can take a look at the log). My initial hypothesis was that it was because i was scheduling it on different threads. But even after removing it..it still scheduled on different threads.This is little bit confusing to me. The only reason i can think of is..that the r2dbc is actually scheduling them on different threads. – Harry Dec 17 '21 at 21:28
  • No, its beacuse thats how reactor works, any thread can do anything at any time. Reactor will try utilize as many threads as possible under the hood. Thats the whole point of using reactive, this is covered in the docs. Please read them, they will teach you all this stuff – Toerktumlare Dec 18 '21 at 01:36
  • The docs says that "Reactor, like RxJava, can be considered to be concurrency-agnostic. That is, it does not enforce a concurrency model. Rather, it leaves you, the developer, in command" and this has been my experience using rxjava and reactor. So i strongly disagree that there is a magic happening. There must be a reason for how and when things are getting scheduled/published on threads. Also please note that i understand how webflux,netty ( event loop etc), webclient and LMAX Disruptor works. So i am just trying to make sense of what i am seeing. Does that make sense ?? – Harry Dec 18 '21 at 20:29
  • no it does not make sense. Tomcat for instance enforces a thread model, which means, "one thread per request". React is agnostic, which means any thread can at any point do any work to maximize the usage time of threads. No there is no magic, asi t says, when any work is done, any thread can schedule any work on the eventloop. Any thread can pick up when the event loop is done with the work. Reactor schedules work using any thread, event loop does the work, any thread can pick up the work. – Toerktumlare Dec 19 '21 at 00:51

2 Answers2

3

Why I do see 4 SQL queries to Film table.

The reason is quite simple. You are subscribing to the Mono<Film> 4 times:

Mono<Film> filmMono = filmRepository.findById(id);

Flux<Actor> actorFlux = filmMono.flatMapMany(...); (1)
Mono<String> language = filmMono.flatMap(...); (2)
Mono<String> category = filmMono.flatMap(...); (3)
Mono.zip(filmMono, actorFlux.collectList(), language, category) (4)

Each subscription to the filmMono triggers a new query. Note that, you can change that by using Mono#cache operator to turn filmMono into a hot source and cache the result for all four subscribers.

lkatiforis
  • 5,703
  • 2
  • 16
  • 35
  • 1
    thanks for the answer and solution. This resolved the issue. I did see something different as far as thread scheduling goes. But i think i will create a seperate post regarding the thread scheduling. Thanks for teaching me a new operation today. Made my day. – Harry Dec 18 '21 at 20:16
0

I'm not terribly familiar with your stack, so this is a high-level answer to hit on your "Why". There WILL be a more specific answer for you, somewhere down the pipe (e.g. someone that can confirm whether this thread is relevant).

While I'm no Spring Daisy (or Spring dev), you bind an expression to filmMono that resolves as the query select film.* from film..... You reference that expression four times, and it's resolved four times, in separate contexts. The ordering of the statements is likely a partially-successful attempt by the lib author to lazily evaluate the expression that you bound locally, such that it's able to batch the four accidentally identical queries. You most likely resolved this by collecting into an actual container, and then mapping on that container instead of the expression bound to filmMono.

In general, this situation is because the options available to library authors aren't good when the language doesn't natively support lazy evaluation. Because any operation might alter the dataset, the library author has to choose between:

  • A, construct just enough scaffolding to fully record all resources needed, copy the dataset for any operations that need to mutate records in some way, and hope that they can detect any edge-cases that might leak the scaffolding when the resolved dataset was expected (getting this right is...hard).
  • B, resolve each level of mapping as a query, for each context it appears in, lest any operations mutate the dataset in ways that might surprise the integrator (e.g. you).
  • C, as above, except instead of duplicating the original request, just duplicate the data...at every step. Pass-by-copy gets real painful real fast on the JVM, and languages like Clojure and Scala handle this by just making the dev be very specific about whether they want to mutate in-place, or copy then mutate.

In your case, B made the most sense to the folks that wrote that lib. In fact, they apparently got close enough to A that they were able to batch all the queries that were produced by resolving the expression bound to filmMono (which are only accidentally identical), so color me a bit impressed.

Many access patterns can be rewritten to optimize for the resulting queries instead. Your milage may vary...wildly. Getting familiar with raw SQL, or else a special-purpose language like GraphQL, can give much more consistent results than relational mappers, but I'm ever more appreciative of good IDE support, and mixing domains like that often means giving up auto-complete, context highlighting, lang-server solution-proofs and linting.

Given that the scope of the question was "why did this happen?", even noting my lack of familiarity with your stack, the answer is "lazy evaluation in a language that doesn't natively support it is really hard."

Sam Hughes
  • 665
  • 8
  • 10
  • Thanks for the explanation. I am aware of lazy and eager evaluation. I tried changing the code using the defer operator and removing the subscribes on each subscription but the behaviour is the same. – Harry Dec 17 '21 at 17:04
  • Did you mess with switchIfEmpty()? Not my domain here, but the linked thread looks relevant. A specific test would be the following: 1. Confirm that it is from how that line is constructed by first collecting the result of that expression into a container. 2. Define a class with a success property and an error property, with an initial guard value, and a method that replaces the guard values and thereafter ignores its parameter, just returning the resolved values, for success or error as appropriate. Initialize and pass that method to switchIfEmpty(). That is, option C above, but for this step – Sam Hughes Dec 17 '21 at 17:57
  • i am going to accept the answer by lkatiforis. But thanks for explaining the concept in more generic way. – Harry Dec 18 '21 at 20:31
  • @harry, oh, heck yeah. I just upvoted his answer, too. – Sam Hughes Dec 19 '21 at 21:05