0

What I expect is reactor will create threads for each emitted element, meaning elements 1, 2, 3, 4, 5 from the source should be handled concurrently. But it's not true from my demo code output, but I don't know why. Could someone take a look and explain to me for two things:

  1. Why does reactor in my demo code handles elements in synchronize fashion?
  2. How to make reactor handles each element concurrent?

Though the reactor chain in my below demo code is async to the main thread, each element from source flux emits in a synchronized way.

Here is my demo code

        System.out.println("main thread start ...");

        Flux.range(1,5)
                .flatMap(num->{
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return Mono.just(num);
                }).flatMap(num-> Mono.just(num*10) )
                .subscribeOn(Schedulers.boundedElastic())
                .subscribe(res-> System.out.println("Thread name:" + Thread.currentThread().getName()+" value:" + res));


        System.out.println("main thread sleep a little");
        Thread.sleep(4000);
        System.out.println("main thread end ...");

Here is the output

        Output:
            main thread start ...
            main thread sleep a little
            0. element: 0
            1. element: 1
            main thread end ...
            2. element: 2
LomoY
  • 51
  • 8

1 Answers1

2

Your code is not really implemented in a reactive way especially for wrapping non-reactive code. Consider slightly reworked example

@Test
void concurrent() {
    Flux<Integer> stream = Flux.range(1, 50)
            .flatMap(this::process)
            .flatMap(num -> Mono.just(num * 10));

    StepVerifier.create(stream)
            .expectNextCount(50)
            .verifyComplete();
}

private Mono<Integer> process(int num) {
    return Mono.fromCallable(() -> {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return num;
    })
    .log()
    .subscribeOn(Schedulers.boundedElastic());
}

If you run it, you would see that execution is happening on multiple threads with concurrency of Queues.SMALL_BUFFER_SIZE

22:54:26.066  [boundedElastic-50] INFO [r.M.L.50] - | request(32)
22:54:27.038  [boundedElastic-6] INFO [r.M.L.6] - | onNext(6)
22:54:27.038  [boundedElastic-11] INFO [r.M.L.11] - | onNext(11)
22:54:27.038  [boundedElastic-2] INFO [r.M.L.2] - | onNext(2)
22:54:27.038  [boundedElastic-9] INFO [r.M.L.9] - | onNext(9)
22:54:27.038  [boundedElastic-3] INFO [r.M.L.3] - | onNext(3)
22:54:27.040  [boundedElastic-13] INFO [r.M.L.13] - | onNext(13)
22:54:27.040  [boundedElastic-7] INFO [r.M.L.7] - | onNext(7)
22:54:27.040  [boundedElastic-20] INFO [r.M.L.20] - | onNext(20)
22:54:27.041  [boundedElastic-6] INFO [r.M.L.6] - | onComplete()
22:54:27.041  [boundedElastic-13] INFO [r.M.L.13] - | onComplete()
22:54:27.043  [boundedElastic-2] INFO [r.M.L.2] - | onComplete()
22:54:27.043  [boundedElastic-3] INFO [r.M.L.3] - | onComplete()
22:54:27.043  [boundedElastic-20] INFO [r.M.L.20] - | onComplete()
22:54:27.043  [boundedElastic-11] INFO [r.M.L.11] - | onComplete()
22:54:27.043  [boundedElastic-7] INFO [r.M.L.7] - | onComplete()
22:54:27.044  [boundedElastic-9] INFO [r.M.L.9] - | onComplete()
22:54:27.045  [boundedElastic-1] INFO [r.M.L.1] - | onNext(1)
22:54:27.045  [boundedElastic-5] INFO [r.M.L.5] - | onNext(5)
22:54:27.045  [boundedElastic-15] INFO [r.M.L.15] - | onNext(15)
22:54:27.045  [boundedElastic-1] INFO [r.M.L.1] - | onComplete()
22:54:27.045  [boundedElastic-15] INFO [r.M.L.15] - | onComplete()

Also, you could check Flight of the Flux 3 - Hopping Threads and Schedulers for more detailed explanation and more examples.

Alex
  • 4,987
  • 1
  • 8
  • 26
  • Thank you @Alex, I'll check the page you shared. I notice the main difference between your and my demo is your code returns a `Mono.fromCallable` and mine returns`Mono.just()`, is this one reason (or the only) reason makes your demo concurrently and mine does not ? – LomoY Apr 19 '22 at 13:32
  • @LomoY https://stackoverflow.com/questions/2521277/what-are-the-hot-and-cold-observables – chrylis -cautiouslyoptimistic- Apr 19 '22 at 13:56