1

I would like to transform/add the Mono from WebClient response into a Map with the input as a key

I am executing a batch of REST calls in parallel using WebClient but instead of returning the list of Users I would like to return a HashMap of ID as the key and the User returned from REST call as the value.

I don't want to block every individual call to get the value before I add to the HashMap.

Is there a way I can transform the result from WebClient into HashMap entry without impacting the parallel execution of the REST calls?

I tried doOnSuccess callback for Mono but not sure if thats really the right way to do it.

Current Implementation

public List<<User> fetchUsers(List<Integer> ids) {
        List<Mono<User>> userMonos = new ArrayList();
        
        for (int id : ids) {
            userMonos.add(webClient.get()
                .uri("/otheruser/{id}", id)
                .retrieve()
                .bodyToMono(User.class));
        }

       List<User> list = Flux.merge(userMonos).collectList().block();
       return list; 
    }

So the expected output is:

HashMap<Integer, User>()

I apologize if I wasn't able to express the expected result appropriately. Feel free to let me know if I need to add more detail or add more clarity to the question.

I would really appreciate some help with this. I am also trying to keep looking for a solution in the meantime.

Nick Div
  • 5,338
  • 12
  • 65
  • 127

2 Answers2

3

you are mixing imperative code with reactive code. You have to pick one way, and stick to it.

If you want the actual values and not Mono or Flux you MUST block. Think of it as a Future, there is no "value" there until we wait for the value to show up. So blocking is the ONLY way.

If i understand your code correctly i would do the following.

public HashMap<Integer, User> fetchUsers(List<Integer> ids) {
    final Map<Integer, User> userMap = new HashMap();
    return Flux.fromIterable(ids)
                .flatMap(id -> webClient.get()
                    .uri("/otheruser/{id}", id)
                    .retrieve()
                    .bodyToMono(User.class)
                    .doOnSuccess(user -> {
                        userMap.put(id, user);
                    })
                .thenReturn(userMap)
                .block()
}

So what does this code do?

It takes a list of id's and place it into a Flux. The flux will async start all the requests at the same time since we are using flatMap. When all the reqests are finished, we will do a side effect by adding the value to the hashmap. Since we dont care about the return type, we use then to silenty ignore the return. we tell it to return the hashmap instead. And lastly we call block to make the code actually run and wait for all the requests etc to finish and produce the final hashmap.

I have written this on mobile, so i cant check against a compiler, but something like this should get you started. If someone sees any errors, feel free to edit.

Toerktumlare
  • 12,548
  • 3
  • 35
  • 54
  • I am new to the webflux so not that fluent and was just going by the examples that I found online. My only concern was making sure the requests are executed in parallel instead of running in sequence. If the above does that, thats all i need. Really appreciate the help. Thank you – Nick Div Aug 09 '21 at 22:37
  • there is a huge difference between parallel and asynchronous. The code above is asynchronous, as in webflux will schedule all the rest calls on the event loop and not block when waiting for the responses. What the code does not do, is send all your requests out to all cores, and run all the requests in parallel on multiple cores. Running things in parallel on multiple cores is usually only beneficial if you have cpu heavy tasks. I/O things usually dont benefit much from being run in parallel. Because there is overhead setting up multiple cores, and sending all the data out to all cores. – Toerktumlare Aug 09 '21 at 23:30
  • I/O stuff is more about orchestration, and not raw cpu power on multiple cores. `flatMap` is async, meaning it will not preserve order and just try o do all requests as fast as it can, in no particular order. – Toerktumlare Aug 09 '21 at 23:32
  • i highly suggest you check out the `reactor reference documentation` if you want to understand the basics of reactive programming. – Toerktumlare Aug 09 '21 at 23:38
  • 1
    note that having external state like that Map outside of the reactive chain can be problematic. this example is only valid because the code guarantees that only a single subscription can happen to the flux (the subscription from `block()`), so there is no risk the `Map` is shared between multiple subscriptions. – Simon Baslé Aug 11 '21 at 10:31
  • 1
    I do completly agree, mutability is always problematic. But also as you pointed out, here it is ”safe” since its not a shared resource. `collectMap` is most likely a better choice. – Toerktumlare Aug 11 '21 at 12:19
  • This helped me! Been struggling for awhile on this. Thanks OP for asking, I basically have the same style problem. – TrollBearPig Nov 01 '22 at 03:44
2

If possible, it's best to avoid modifying external state from side effect operators like doOnSuccess. For example in this particular case it could cause concurrency issues if the external Map is not thread-safe.

As a better alternative you can collect to a Map using Reactor operator:

public Map<Integer, User> fetchUsers(List<Integer> ids) {
    return Flux.fromIterable(ids)
               .flatMap(id -> webClient.get()
                                       .uri("/otheruser/{id}", id)
                                       .retrieve()
                                       .bodyToMono(User.class)
                                       .map(user -> Tuples.of(id, user)))
               .collectMap(Tuple2::getT1, Tuple2::getT2)
               .block();
}

Instead of Tuple you might create a small class to improve readability. Or even better, if the User knows its ID, then you can omit Tuple completely and you can do something like .collectMap(User::getId, user -> user).

Martin Tarjányi
  • 8,863
  • 2
  • 31
  • 49
  • The User object returned by the Service is a DTO and does not contain its own ID :( But what exactly is getT1 and getT2 doing? Is this also going to trigger all REST requests in parallel? – Nick Div Aug 09 '21 at 22:40
  • this does the same thing as the above answer, just collecting the results in a map. It does all the requests async, because `flatMap` is async. `get1` and `get2` are just functions on the `tuple` to get the first or second value in the `tuple`. If you don't know what it is you can read about it here https://stackoverflow.com/questions/626759/whats-the-difference-between-lists-and-tuples its technically a immutable array of items. An array you can\t change the values in. So here he is creating a `Tuple` with the id as the first value and the user as the second value then he gets the values. – Toerktumlare Aug 09 '21 at 23:36