4

My understanding from looking through the API is that using Schedulers.boundedElastic() or variants like Schedulers.newBoundedElastic(3, 10, "MyThreadGroup"); or Schedulers.fromExecutor(executor) allows for processing an IO operation in more than one thread.

But a simulation with the following sample code appears to indicate a single thread/same thread is doing the work in the flatMap

Flux.range(0, 100)
                .flatMap(i -> {
                    try {
                        // IO operation
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("Mapping for " + i + " is done by thread " + Thread.currentThread().getName());
                    return Flux.just(i);
                })
                .subscribeOn(Schedulers.boundedElastic())
                .subscribe();

Thread.sleep(10000); // main thread

//This yields the following

Mapping for 0 is done by thread boundedElastic-1
Mapping for 1 is done by thread boundedElastic-1
Mapping for 2 is done by thread boundedElastic-1
Mapping for 3 is done by thread boundedElastic-1 ...

The above output suggests to me the same Thread is running within the flatMap. Is there a way to get more than one Thread to process items when the flatMap is invoked on subcribe for multiple IO? I was expecting to see boundedElastic-1, boundedElastic-2 ... .

Dayo
  • 121
  • 2
  • 7
  • possible duplicate https://stackoverflow.com/questions/65689935/do-you-have-a-test-to-show-differences-between-the-reactor-map-and-flatmap/65692723#65692723 – Toerktumlare Mar 17 '21 at 15:23
  • flatMap is `async` it is not `parallel` and chances are you wont gain anything from using parallel execution unless you have heavy cpu computational stuff, for I/O work like file reading or rest calls, threads will mostly be waiting for responses, which does not need parallel execution, but instead optimised async execution which flatMap offers. – Toerktumlare Mar 17 '21 at 15:29
  • I would think one can gain from multiple Threads processing independent emitted elements needing IO operations to be performed. – Dayo Mar 17 '21 at 15:48
  • I have not claimed that multiple threads are not necassary. Where did you get that from? Threads are essential, but in reactor threads dont do the work, threads schedule work. The event loop does the work and it is single threaded. What is most efficiant is up to the work that is done. On high loads in a webflux application the framwork will schedule work as efficiant as possible. – Toerktumlare Mar 17 '21 at 17:06
  • There is a huge difference between async work and parallel work. – Toerktumlare Mar 17 '21 at 17:07

2 Answers2

8

1. Concurrency with non-blocking IO (preferred)

If you have the chance to use non-blocking IO (like Spring WebClient), then you don't need to worry about threads or schedulers and you get concurrency out of the box:

Flux.range(0, 100)
        .flatMap(i -> Mono.delay(Duration.ofMillis(500)) // e.g.: reactive webclient call
                .doOnNext(x -> System.out.println("Mapping for " + i + " is done by thread " + Thread.currentThread()
                        .getName())))
        .subscribe();

2. Concurrency with blocking IO

It's better to avoid blocking IO if you have the choice. In case you can't avoid it, you just need to make a slight modification to your code and apply subscribeOn to the inner Mono:

Flux.range(0, 100)
        .flatMap(i -> Mono.fromRunnable(() -> {
            try {
                // IO operation
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("Mapping for " + i + " is done by thread " + Thread.currentThread().getName());
        }).subscribeOn(Schedulers.boundedElastic()))
        .subscribe();
Martin Tarjányi
  • 8,863
  • 2
  • 31
  • 49
2

One way to get the flatMap running on multiple Threads is to create a ParallelFlux. The sample code below does the trick.

Flux.range(0, 1000)
                .parallel()             
                .runOn(Schedulers.boundedElastic())
                .flatMap(i -> {
                    try {
                        // IO operation
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("second Mapping for " + i + " is done by thread " + Thread.currentThread().getName());
                    return Flux.just(i);
                })
                .subscribe();
        
        Thread.sleep(10000);

Dayo
  • 121
  • 2
  • 7