1

I am trying to understand Java reactive library and I came through this keyword "concurrency-agnostic" here. Can someone provide a small working example to better understand this?

Does it mean that the developer has to make sure that his code will work fine and will achieve concurrency?

p.s: very new to concurrent / parallel applications

ihaider
  • 1,290
  • 4
  • 19
  • 38
  • It means that you can make it concurrent or not, whatever you like – user Jun 01 '20 at 18:34
  • 6
    They immediately define it - *"it does not enforce a concurrency model"*. If you want to know more about different concurrency models, see e.g. https://stackoverflow.com/q/31627441/3001761. – jonrsharpe Jun 01 '20 at 18:35
  • @jonrsharpe the link was really useful for a beginner like me to under "it does not enforce a concurrency model" – ihaider Jun 02 '20 at 00:30

1 Answers1

1

This is what the doc says:

...it does not enforce a concurrency model. Rather, it leaves you, the developer, in command. However, that does not prevent the library from helping you with concurrency.

Obtaining a Flux or a Mono does not necessarily mean that it runs in a dedicated Thread. Instead, most operators continue working in the Thread on which the previous operator executed

Let's see a couple of examples:

1) Non concurrent/parallel execution. By default Project Reactor doesn't enforce concurrency. That's why the following Mono runs on the same thread (main) where it was subscribed:

        Logger logger = LoggerFactory.getLogger(MyApplication.class);

        Mono<String> helloMono = Mono.defer(() -> Mono.just("Hello!"))
                .doOnNext(logger::info);

        helloMono.subscribe();

Logs:

22:06:15.235 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
22:06:19.841 [main] INFO MyApplication - Hello!

2) Parallel execution. We can make the Mono execute on a different thread, enforcing parallelism using the subscribeOn() operation. Notice how the Mono runs on the elastic-2 thread and we need to use a CountDownLatch to force the program to wait:

        Logger logger = LoggerFactory.getLogger(MyApplication.class);

        Mono<String> helloMono = Mono.defer(() -> Mono.just("Hello!"))
                .doOnNext(logger::info)
                .doOnTerminate(latch::countDown)
                .subscribeOn(Schedulers.elastic());

        helloMono.subscribe();

        latch.await();

Logs:

22:11:26.704 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
22:11:26.733 [elastic-2] INFO MyApplication - Hello!

Frameworks such as WebFlux use an approach similar to the second scenario in which I/O operations free the calling thread (http threads). They are run in parallel using those other thread pools.

codependent
  • 23,193
  • 31
  • 166
  • 308