1

I am trying to print a counter which increments from a doOnNext() function.

    public static void main(String [] args) {
        final AtomicInteger counter = new AtomicInteger();
        Mono.just(List.of(1))
                .doOnNext(i -> System.out.println("doOnNext " + counter.incrementAndGet()))
                .map(i -> {
                    System.out.println("map " + counter.get());
                    return i;
                })
                .then(thenFunction(counter))
                .block();
    }

    private static Mono<Integer> thenFunction(final AtomicInteger counter) {
        System.out.println("then " + counter.get());
        return Mono.just(2);
    }

From the documentation, what the then() function does is

Let this Flux complete then play signals from a provided Mono.

So I should be expecting the doOnNext() and map() to complete before the then() works. However, the output is

then 0
doOnNext 1
map 1

Shouldn't the then() wait till the upstream completes before processing?

I am using spring-boot-starter-webflux 2.3.8.RELEASE

1 Answers1

3

In your example, the thenFunction method is invoked during the assembly phase while constructing the stream. Therefore, System.out.println("then " + counter.get()); is invoked before elements start flowing through the stream.

To defer the logic within the thenFunction until subscription time, you can either wrap the thenFunction call in a Mono.defer, like this:

                .then(Mono.defer(() -> thenFunction(counter)))

Or, (preferred) you can make the thenFunction defer its logic internally until the returned Mono is subscribed, like this:

    private static Mono<Integer> thenFunction(final AtomicInteger counter) {
        return Mono.fromCallable(() -> {
            System.out.println("then " + counter.get());
            return 2;
        });
    }
Phil Clay
  • 4,028
  • 17
  • 22
  • ah! That is good insight. Just one question about why you would prefer Mono.fromCallable() over Mono.defer() ? It seemed that the latter function looked more clean and they achieve the same thing (fire the supplier function when downstream subscribes) – Ketone Maniac Jan 27 '21 at 06:42
  • @KetoneManiac because one is a Hot publisher and the other is a Cold publisher https://projectreactor.io/docs/core/release/reference/#reactor.hotCold – Toerktumlare Jan 27 '21 at 08:11
  • So if I understood this correctly, it is better to do Mono.fromCallable() which in itself is a cold publisher, rather than making a hot publisher (Mono.just(2)) and then explicitly making it cold (Mono.defer()) ? – Ketone Maniac Jan 27 '21 at 10:06
  • I think the more general recommendation is to follow the reactive principle of "nothing happens until you subscribe". Putting logic in reactive methods (other than just building the stream) that happens during the assembly phase is a recipe for disaster. Not only do you get the weird behavior that you mentioned, but you also lose the ability to take advantage of some of the valuable resiliency operators like `.timeout`, `.retry*`, or `.repeat*`, since those operators do not affect assembly time logic. – Phil Clay Jan 27 '21 at 18:00