8

The following function:

private Boolean canDoIt(Parameter param) {
  return myService
      .getMyObjectInReactiveWay(param)
      .map(myObject -> myService.checkMyObjectInImperativeWay(myObject))
      .block();
}

is working fine at runtime, but when testing a flow which uses it using WebTestClient I get the following error:

java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread parallel-1
    at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:83) ~[reactor-core-3.4.1.jar:3.4.1]
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.MonoFlatMap] :
    reactor.core.publisher.Mono.flatMap

I know I am not supposed to use block() but I have no other choice: the function has to return a Boolean (not a Mono<Boolean>). Perhaps there is an alternative way to write it which doesn't use block().

Is there a way I can make WebTestClient not throw that error?

Using Reactor Core version 3.4.6.

Marco Lackovic
  • 6,077
  • 7
  • 55
  • 56
  • 1
    The problem is that Reactor actually forbids calling `block` in Schedulers not explicitly marked as compatible with blocking code. I do not find detailed doc about that, but I think that. you have to use either `subscribeOn(Schedulers.boundedElastic())` or `.share()` method on your flux before block call. The difference in behaviour might come from the fact that in one case, block function triggers pipeline in current thread (non reactive scheduler), but in your test, you try to call `canDoIt` from a reactive stack, so the executing thread comes from a reactive scheduler. – amanin May 14 '21 at 15:22
  • I tried both with `subscribeOn` and `share` but I still get the same error. Please note that the error does not appear with `WebClient`, it appears only with `WebTestClient`. – Marco Lackovic May 17 '21 at 12:14
  • Maybe I miss/misunderstand something. Can you please make an edit with your attempt to use share and subscribeOn ? Also, the failing piece of code using WebTestClient and `flatMap` would be nice. It would help me to dig further and adapt my answer. – amanin May 17 '21 at 17:03
  • Also, I've made a rather generic answer because I was not too sure, but If I have the failing code (well, a minimal reproducible example of it), I will be able to make an answer really focused on your use-case. – amanin May 17 '21 at 17:09

2 Answers2

4

I validate my comment. block() checks if the calling thread is compatible with blocking code (thread external to reactor, or a thread of a specific reactor scheduler like Schedulers.boundedElastic()).

There's 2 ways to handle blocking calls in the middle of a reactive stack:

  • Use the share operator on publisher you will block upon. Be careful, the share operator caches the value internally.
  • Force block() call to be executed on a blocking compatible scheduler using scheduleOn or publishOn. Beware, this operators should not be called on the publisher that directly call block(), but on the publisher that will "wrap" the block call (see example below).

Some references:

And a minimal reproducible example (tested on v3.4.6) giving this output:

Ok context: not running from reactor Threads
value is true
Problematic stack: working with scheduler not compatible with blocking call
ERROR: block()/blockFirst()/blockLast() are blocking, which is not supported in thread parallel-2
Bad way to subscribe on a blocking compatible scheduler
ERROR: block()/blockFirst()/blockLast() are blocking, which is not supported in thread parallel-4
Bad way to publish on blocking compatible scheduler
ERROR: block()/blockFirst()/blockLast() are blocking, which is not supported in thread parallel-6
Possible workaround: share the reactive stream before blocking on it
It worked
Right way to subscribe on blocking compatible scheduler
It worked
Right way to publish on blocking compatible scheduler
It worked

Here comes the code:

import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import java.time.Duration;
import java.util.concurrent.Callable;
import java.util.function.Supplier;

public class BlockingWorkaround {

    public static void main(String[] args) throws Exception {
        System.out.println("Ok context: not running from reactor Threads");
        System.out.println("value is "+blockingFunction());

        System.out.println("Problematic stack: working with scheduler not compatible with blocking call");
        executeAndWait(() -> blockingFunction());

        System.out.println("Bad way to subscribe on a blocking compatible scheduler");
        executeAndWait(() -> blockingFunctionUsingSubscribeOn());

        System.out.println("Bad way to publish on blocking compatible scheduler");
        executeAndWait(() -> blockingFunctionUsingPublishOn());

        System.out.println("Possible workaround: share the reactive stream before blocking on it");
        executeAndWait(() -> blockingFunctionShared());

        System.out.println("Right way to subscribe on blocking compatible scheduler");
        subscribeOnAndWait(() -> blockingFunction());

        System.out.println("Right way to publish on blocking compatible scheduler");
        publishOnAndWait(() -> blockingFunction());
    }

    static Boolean blockingFunction() {
        return delay()
                .flatMap(delay -> Mono.just(true))
                .block();
    }

    static Boolean blockingFunctionShared() {
        return delay()
                .flatMap(delay -> Mono.just(true))
                .share() // Mono result is cached internally
                .block();
    }

    static Boolean blockingFunctionUsingSubscribeOn() {
        return delay()
                .subscribeOn(Schedulers.boundedElastic())
                .flatMap(delay -> Mono.just(true))
                .block();
    }

    static Boolean blockingFunctionUsingPublishOn() {
        return delay()
                .flatMap(delay -> Mono.just(true))
                .publishOn(Schedulers.boundedElastic())
                .block();
    }

    static Mono<Long> delay() {
        return Mono.delay(Duration.ofMillis(10));
    }

    private static void executeAndWait(Supplier<Boolean> blockingAction) throws InterruptedException {
        delay()
                .map(it -> blockingAction.get())
                .subscribe(
                        val -> System.out.println("It worked"),
                        err -> System.out.println("ERROR: " + err.getMessage())
                );

        Thread.sleep(100);
    }

    private static void subscribeOnAndWait(Callable<Boolean> blockingAction) throws InterruptedException {
        final Mono<Boolean> blockingMono = Mono.fromCallable(blockingAction)
                .subscribeOn(Schedulers.boundedElastic()); // Upstream is executed on given scheduler

        delay()
                .flatMap(it -> blockingMono)
                .subscribe(
                        val -> System.out.println("It worked"),
                        err -> System.out.println("ERROR: " + err.getMessage())
                );

        Thread.sleep(100);
    }

    private static void publishOnAndWait(Supplier<Boolean> blockingAction) throws InterruptedException {
        delay()
                .publishOn(Schedulers.boundedElastic()) // Cause downstream to be executed on given scheduler
                .map(it -> blockingAction.get())
                .subscribe(
                        val -> System.out.println("It worked"),
                        err -> System.out.println("ERROR: " + err.getMessage())
                );

        Thread.sleep(100);
    }
}
amanin
  • 3,436
  • 13
  • 17
  • I am afraid I fail to see how this solution fits my needs: in your `subscribeOnAndWait` you use a `Mono` and return `void`, I need to return a `Boolean` to the outside caller. – Marco Lackovic May 17 '21 at 12:22
  • 1
    My example consider the `blockingFunction` to be equivalent to your `canDoIt` function. When this function is called from a reactive context, it might the error you are seeing. To avoid that, the call to the function must be wrapped back in a Mono (in my example `blockingMono`), and that Mono must be scheduled on the bounded elastic scheduler. Note that, if it is not possible to identify or modify places where your `canDoIt` function is injected back to a reactive context, things become more complicated. – amanin May 17 '21 at 17:01
  • plase take a look https://stackoverflow.com/questions/75058585/block-blockfirst-blocklast-are-blocking-which-is-not-supported-in-thread – gstackoverflow Jan 10 '23 at 15:56
  • I'm just curious to know why it works, in particular what is exactly `Schedulers.boundedElastic()` and why without `.subscribe()` it does not work. – Marco Sulla May 20 '23 at 13:13
  • 1
    @MarcoSulla I use subscribe because I've done an example program, and I need to start the reactive pipeline manually.. In a Spring app, you just return the publisher back, and Spring will trigger it itself. As for Schedulers.boundedElastic, I cannot really provide a better description than official API doc (linked in references). It is a worker pool that can spawn a lot of threads/workers, to cope with blocking calls inside a reactive chain. – amanin May 20 '23 at 17:22
  • @amanin Of course usually I return, for example from an REST API, the mono, but I checked your solution because I needed the result in another way. The fact I do not understand is why without `subscribre()` the `block()` is not triggered. – Marco Sulla May 21 '23 at 19:31
  • It is just how reactor works. The flux is not executed until a subscription is triggered. Therefore, `Mono.fromCallable(someMono::block)` is a mono that runs `someMono.block()` each time someone subscribe on it. – amanin May 22 '23 at 16:16
2

Decrypting amanin's answer, this is her/his TL;DR:

Scheduler scheduler = Schedulers.boundedElastic();

Mono
    .fromCallable(myMono::block)
    .subscribeOn(scheduler)
    .subscribe(val -> {});

Where myMono is the Mono to block.

It works because the default scheduler is Schedulers.parallel(), that's a multiprocessing (workers) scheduler, while Schedulers.boundedElastic() is a multithread scheduler.

Marco Sulla
  • 15,299
  • 14
  • 65
  • 100