1

I have a List<String> containing URLs and I would like to perform a GET request for each URL in that List.

Those requests should be made in parallel. After all the requests are done I would like to have a List<CustomModel> containing all the deserialized responses.

So I created a method to make the HTTP request

public Flux<JsonNode> performGetRequest(String url) {
    WebClient webClient = WebClient.create(String.format("%s%s", API_BASE_URL, url));

    return webClient.get()
            .retrieve()
            .bodyToFlux(JsonNode.class);
}

The above method is called this way

public List<CustomModel> fetch(List<String> urls) {
    return Flux.fromIterable(urls)
            .parallel()
            .runOn(Schedulers.boundedElastic())
            .flatMap(this::performGetRequest)
            .flatMap(jsonNode -> Flux.fromIterable(customDeserialize(jsonNode)))
            .sequential()
            .collectList()
            .flatMapMany(Flux::fromIterable)
            .collectList()
            .block();
}

For each response, I am using a custom method to deserialize the response

private List<CustomModel> customDeserialize(final JsonNode jsonNodeResponse) {
    List<CustomModel> customModelList = new ArrayList<>();
    
    for (JsonNode block : jsonNodeResponse) {
        // deserialize the response, create an instance of CustomModel class 
        // and add it to customModelList
    }
    
    return customModelList;
}

The problem is that even tho I use the parallel() method the whole process is probably not running on parallel. The time it takes to complete indicates that I am doing something wrong.

Am I missing something?

Thanos M
  • 604
  • 6
  • 21
  • 2
    This is an anti-pattern. The point of Flux/Reactive is that it does the work on only a few threads. You are purposely invoking multiple threads. As the manual says the purpose of parallelism in reactive is for computationally intensive tasks that need threads. An HTTP request is I/O bounded and not computationally intensive. – K.Nicholas Mar 19 '21 at 16:03
  • @K.Nicholas even though it wouldn't be the point of Flux/Reactive, I can see use cases for http requests sent in parallel, for example if you have lot of urls to check. I wouldn't say it's a bad thing or forbidden to do so. Of course, you could do the same with multiple threads in the regular blocking world, too. – eis Mar 19 '21 at 17:06
  • 1
    IHMO, If you have multiple HTTP requests you should use `Mono::zip` so that the reactive framework executes them in parallel and returns the all the results to you. – K.Nicholas Mar 19 '21 at 17:08
  • @K.Nicholas so to execute multiple HTTP requests in parallel should I prefer the ExecutorService with Callables and a `List>` ? – Thanos M Mar 19 '21 at 17:32
  • `Mono::zip` --- ? – K.Nicholas Mar 19 '21 at 17:35
  • @K.Nicholas The downside of zip is that it will wait for all responses instead of continuing with the ones already returned. – Puce Mar 19 '21 at 18:36
  • @Puce -- this is true though I don't remember seeing anything else obvious and easy for doing the same thing. It seems to me the issue is that each HTTP is returning its own type of response. Perhaps if you wrap responses in a general class you can write a Flux producer but I'm not sure how well that is going to work not having tried it. `Mono::zip` always worked fine for a couple of parallel calls but if you are going to do a LOT of parallel calls to multiple REST services I have to wonder if you can look closer at the system design. – K.Nicholas Mar 19 '21 at 19:50

2 Answers2

2

The problem is that even tho I use the parallel() method the whole process is probably not running on parallel. The time it takes to complete indicates that I am doing something wrong.

Am I missing something?

Since you are calling block im going to assume you are running a MVC servlet application which is using WebClient only for rest calls.

If you are not running a full webflux application, your application will start up a single event loop that will process all events that are scheduled. If running a full webflux application, you will get as many event loops as cores on the running machine.

By the usage of parallel the reactor documentation says:

To obtain a ParallelFlux, you can use the parallel() operator on any Flux. By itself, this method does not parallelize the work. Rather, it divides the workload into “rails” (by default, as many rails as there are CPU cores).

In order to tell the resulting ParallelFlux where to run each rail (and, by extension, to run rails in parallel) you have to use runOn(Scheduler). Note that there is a recommended dedicated Scheduler for parallel work: Schedulers.parallel().

You are creating a boundedElastic scheduler which is not optimised for parallel work.

But i want to mention, you are doing async i/o not parallel work which is very important to point out. You will most likely not gain any performance gains, when you are running in parallel since most of your i/o will fire off a request and then just wait for a response.

ParellelFlux will ensure that all cpu cores are being used, but there is also some punishements. There is a setup time to make sure that all cores get up to start doing work, then the work that needs to be done is not cpu-intensive, they just fire off say 1000 requests, then all the threads are done, and have to wait for responses.

Workers need to be setup on the cores, the information needs to be sent to each core, retrieved etc.

parallel gains most of its benefits when you have CPU intensive work, where each event needs to perform heavy computations on multiple cores. But for async work a regular Flux will most likely be enough.

Here is what Simon Baslé one of the reactor devs has to say about running i/o work in reactor, parallel vs async

Also worth mentioning, a boundedElastic scheduler is tuned for blocking work as a fallback to regular servlet behaviour in a pure webflux application.

You are running webflux in a servlet application, so what benefits you get may not be as a full as a webflux application.

Toerktumlare
  • 12,548
  • 3
  • 35
  • 54
0

I'm not 100% sure if this is the issue here, but I noticed when working with WebClient and ParallelFlux, that the WebClient is only returning the Publisher for the response (bodyToMono / bodyToFlux), not for the actual request.

Consider to wrap the remote call with Flux.defer / Mono.defer to get a Publisher already for the request, e.g. something like:

.flatMap(url -> Flux.defer(() -> performGetRequest(url)))
Puce
  • 37,247
  • 13
  • 80
  • 152