59

I've found a lot of answers regarding RxJava, but I want to understand how it works in Reactor.

My current understanding is very vague, i tend to think of map as being synchronous and flatMap to be asynchronous but I can't really get my had around it.

Here is an example:

files.flatMap { it ->
    Mono.just(Paths.get(UPLOAD_ROOT, it.filename()).toFile())
        .map {destFile ->
            destFile.createNewFile()
            destFile    
        }               
        .flatMap(it::transferTo)
}.then()  

I have files (a Flux<FilePart>) and i want to copy it to some UPLOAD_ROOT on the server.

This example is taken from a book.

I can change all the .map to .flatMap and vice versa and everything still works. I wonder what the difference is.

shredding
  • 5,374
  • 3
  • 46
  • 77
  • Can you be more specific about your actual question? To which methods you are actually referring to, there are multiple `map` and `flatMap` methods in Java. – Zabuzard Mar 05 '18 at 16:43
  • I'm talking about the Reactor project. https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html – shredding Mar 05 '18 at 16:48
  • Yeah, now please be more specific about the question. Where exactly you have difficulties understanding it? Why doesn't taking a look at the source code and the documentation clear the confusion? Is there something specific you don't get? – Zabuzard Mar 05 '18 at 16:52
  • I understand from the docs that both are ways to iterate over a given Flux and that `map` is synchronous and `flatMap` is not. But I also understand that the function i give into `map` is executed async and I don't know when to use which one. – shredding Mar 05 '18 at 16:56
  • 1
    ... i'll update with a concrete example. – shredding Mar 05 '18 at 16:57
  • I did update the sample ... – shredding Mar 05 '18 at 17:02
  • you mention finding a lot of answers regarding RxJava. Note that this is the exact same concept as in RxJava – Simon Baslé Mar 08 '18 at 09:04
  • Add a tag `kotlin` probably? Since your example is in this language and not in Java? – Inego Sep 04 '19 at 01:42

3 Answers3

83
  • map is for synchronous, non-blocking, 1-to-1 transformations
  • flatMap is for asynchronous (non-blocking) 1-to-N transformations

The difference is visible in the method signature:

  • map takes a Function<T, U> and returns a Flux<U>
  • flatMap takes a Function<T, Publisher<V>> and returns a Flux<V>

That's the major hint: you can pass a Function<T, Publisher<V>> to a map, but it wouldn't know what to do with the Publishers, and that would result in a Flux<Publisher<V>>, a sequence of inert publishers.

On the other hand, flatMap expects a Publisher<V> for each T. It knows what to do with it: subscribe to it and propagate its elements in the output sequence. As a result, the return type is Flux<V>: flatMap will flatten each inner Publisher<V> into the output sequence of all the Vs.

About the 1-N aspect:

for each <T> input element, flatMap maps it to a Publisher<V>. In some cases (eg. an HTTP request), that publisher will emit only one item, in which case we're pretty close to an async map.

But that's the degenerate case. The generic case is that a Publisher can emit multiple elements, and flatMap works just as well.

For an example, imagine you have a reactive database and you flatMap from a sequence of user IDs, with a request that returns a user's set of Badge. You end up with a single Flux<Badge> of all the badges of all these users.

Is map really synchronous and non-blocking?

Yes: it is synchronous in the way the operator applies it (a simple method call, and then the operator emits the result) and non-blocking in the sense that the function itself shouldn't block the operator calling it. In other terms it shouldn't introduce latency. That's because a Flux is still asynchronous as a whole. If it blocks mid-sequence, it will impact the rest of the Flux processing, or even other Flux.

If your map function is blocking/introduces latency but cannot be converted to return a Publisher, consider publishOn/subscribeOn to offset that blocking work on a separate thread.

Simon Baslé
  • 27,105
  • 5
  • 69
  • 70
  • you meant map is `blocking`, right? I also don't get the `1-to-N` really. Can you give an example when one is useful over the other? I understand that i use flatMap when I expect the result to be asynchronous, because it flattens the publishers once the result is there - is that correct? – shredding Mar 08 '18 at 10:31
  • 2
    no, map function should be non-blocking (unless you also offset the work on a separate thread using `publishOn`/`subscribeOn`). that is, it is executed synchronously but shouldn't have latency. flatMap function is asynchronous and indeed the operator flatten the results as they become available – Simon Baslé Mar 08 '18 at 11:34
  • 1
    edited the answer to explain both these aspects + flatMap 1-N example – Simon Baslé Mar 08 '18 at 11:45
  • 1
    Is this answer out of date? I think what you're calling flatMap is now "flatMapMany", and flatMap does something different -- https://github.com/reactor/reactor-core/issues/516 – Hazel T Nov 16 '18 at 22:07
  • 1
    Just that Mono now has the added subtlety of having both flatMap (async 1 to 1 transformation) and flatMapMany (async 1 to n) – Simon Baslé Nov 17 '18 at 14:51
10

The flatMap method is similar to the map method with the key difference that the supplier you provide to it should return a Mono<T> or Flux<T>.

Using the map method would result in a Mono<Mono<T>> whereas using flatMap results in a Mono<T>.

For example, it is useful when you have to make a network call to retrieve data, with a java API that returns a Mono, and then another network call that needs the result of the first one.

// Signature of the HttpClient.get method
Mono<JsonObject> get(String url);

// The two urls to call
String firstUserUrl = "my-api/first-user";
String userDetailsUrl = "my-api/users/details/"; // needs the id at the end

// Example with map
Mono<Mono<JsonObject>> result = HttpClient.get(firstUserUrl).
  map(user -> HttpClient.get(userDetailsUrl + user.getId()));
// This results with a Mono<Mono<...>> because HttpClient.get(...)
// returns a Mono

// Same example with flatMap
Mono<JsonObject> bestResult = HttpClient.get(firstUserUrl).
  flatMap(user -> HttpClient.get(userDetailsUrl + user.getId()));
// Now the result has the type we expected

Also, it allows for handling errors precisely:

public UserApi {
  
  private HttpClient httpClient;
    
  Mono<User> findUser(String username) {
    String queryUrl = "http://my-api-address/users/" + username;
    
    return Mono.fromCallable(() -> httpClient.get(queryUrl)).
      flatMap(response -> {
        if (response.statusCode == 404) return Mono.error(new NotFoundException("User " + username + " not found"));
        else if (response.statusCode == 500) return Mono.error(new InternalServerErrorException());
        else if (response.statusCode != 200) return Mono.error(new Exception("Unknown error calling my-api"));
        return Mono.just(response.data);
      });
  }
                                           
}
Raouf Makhlouf
  • 193
  • 1
  • 8
7

How map internally works in the Reactor.

How MAP internally works

Creating a Player class.

@Data
@AllArgsConstructor
public class Player {
        String name;
        String name;
}

Now creating some instances of Player class

Flux<Player> players = Flux.just(
        "Zahid Khan",
        "Arif Khan",
        "Obaid Sheikh")
        .map(fullname -> {
            String[] split = fullname.split("\\s");
            return new Player(split[0], split[1]);
        });

StepVerifier.create(players)
          .expectNext(new Player("Zahid", "Khan"))
          .expectNext(new Player("Arif", "Khan"))
          .expectNext(new Player("Obaid", "Sheikh"))
          .verifyComplete();

What’s important to understand about the map() is that the mapping is performed synchronously, as each item is published by the source Flux. If you want to perform the mapping asynchronously, you should consider the flatMap() operation.

How FlatMap internally works.

How FlatMap internally works.

Flux<Player> players = Flux.just(
      "Zahid Khan", 
      "Arif Khan", 
      "Obaid Sheikh")
      .flatMap(
            fullname -> 
                  Mono.just(fullname).map(p -> {
                        String[] split = p.split("\\s");
                        return new Player(split[0], split[1]);
        }).subscribeOn(Scheduler.parallel()));

        List<Player> playerList = Arrays.asList(
                  new Player("Zahid", "Khan"),
                  new Player("Arif", "Khan"), 
                  new Player("Obaid", "Sheikh"));

        StepVerifier.create(players).expectNextMatches(player ->         
                playerList.contains(player))    
                        .expectNextMatches(player ->  
                                playerList.contains(player))
                        .expectNextMatches(player -> 
                                playerList.contains(player))
                        .expectNextMatches(player -> 
                                playerList.contains(player))
                        .verifyComplete();

Internally in a Flatmap(), a map() operation is performed to the Mono to transform the String to Player. Furthermore, subcribeOn () indicates that each subscription should take place in a parallel thread. In absence of subscribeOn() flatmap() acts as a synchronized.

The map is for synchronous, non-blocking, one-to-one transformations while the flatMap is for asynchronous (non-blocking) One-to-Many transformations.

Zahid Khan
  • 2,130
  • 2
  • 18
  • 31
  • Based on the picture In your flatmap example it is possible to have different amount of input and output elements. Could you please provide example ? because in your example you have equal amount (3 input elements and 3 output) elements. – gstackoverflow Jan 10 '23 at 08:10