2

Consider the following code:

@Slf4j
@ExtendWith(MockitoExtension.class)
class ConnectionEventsConsumerTest {

    @Test
    public void testOnErrorResume() {
        Flux.range(0, 5)
                .doOnNext(event -> log.info("Processing -  {}", event))
                .flatMap(event -> processEvent(event)
                        .doOnSuccess(result -> log.info("Processed - {}", event))
                        .onErrorResume(t -> handleError(t, event))
                )
                .doOnError(t -> log.error("Exception propagated", t))
                //.log()
                .then()
                .subscribe();
    }

    private Mono<Void> processEvent(Object object) {
        return Mono.error(() -> new RuntimeException("test"));
        //throw new RuntimeException("test");
    }
    
    private Mono<Void> handleError(Throwable throwable, Object object) {
        log.error("Processing Failed - {}", object);
        
        return Mono.empty();
    }
    
}

The output is completely different if the method processEvent returns a Mono.error than if it throws an Exception.

The code as it is (returning a Mono.error), I see what I expected, 300 iterations of Processing and Processins Failed, and I see no Exception propagated.

17:33:19.853 [main] INFO com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest

  • Processing - 0 17:33:19.864 [main] ERROR com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
  • Processing Failed - 0 17:33:19.865 [main] INFO com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
  • Processing - 1 17:33:19.866 [main] ERROR com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
  • Processing Failed - 1 17:33:19.866 [main] INFO com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
  • Processing - 2 17:33:19.866 [main] ERROR com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
  • Processing Failed - 2 17:33:19.866 [main] INFO com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
  • Processing - 3 17:33:19.866 [main] ERROR com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
  • Processing Failed - 3 17:33:19.866 [main] INFO com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
  • Processing - 4 17:33:19.866 [main] ERROR com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
  • Processing Failed - 4

On the other hand, if I uncomment the throw, I see a single item from the Flux being processed, I do not see the message from handleError and I see the "Exception Propagated"

17:35:53.950 [main] INFO com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest

  • Processing - 0 17:35:53.968 [main] ERROR com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
  • Exception propagated java.lang.RuntimeException: test

If this is by design, what are the best practices for the flatMap? On easy solution that comes to mind is to surround th content of the flatMap by a try-catch to wrap the exception in a Mono.error. While it works, it is inelegant and too manual, likely to be forgotten.

Mark Rotteveel
  • 100,966
  • 191
  • 140
  • 197
J_D
  • 3,526
  • 20
  • 31
  • Does this answer your question? [Correct way of throwing exceptions with Reactor](https://stackoverflow.com/questions/53595420/correct-way-of-throwing-exceptions-with-reactor) I think it has great explanation – Vova Bilyachat Aug 27 '21 at 02:13
  • 2
    I did read this post before posting that question, when I was investigating. Unfortunately, I do not think it helps much. My interpretation of that post is basically that it can be summarized as: Don't use exceptions, work with Mono.error. This is already what we are doing, but by definition, exceptions can be thrown from anywhere and saying don't use exceptions is unrealistic. – J_D Aug 27 '21 at 13:23
  • Got it. I will have a look – Vova Bilyachat Aug 28 '21 at 03:14

1 Answers1

3

A method creating/returning a Mono should not throw exception in such way. Since the exception is thrown before Mono is assembled (created), the subsequent operators inside the flatMap can't possibly take effect since they need a Mono to operate on.

If you have no control over the processEvent() method to fix its behaviour then you can wrap it with a Mono.defer which will ensure that even the errors raised during the assembly period will be propagated through the Mono inside the flatMap:

Flux.range(0, 5)
    .doOnNext(event -> log.info("Processing -  {}", event))
    .flatMap(event -> Mono.defer(() -> processEvent(event))
                .doOnSuccess(result -> log.info("Processed - {}", event))
                .onErrorResume(t -> handleError(t, event)))
    .doOnError(t -> log.error("Exception propagated", t))


private Mono<Void> processEvent(Object object) {
    throw new RuntimeException("test");
}

Note that inside other intermediate operators like map or doOnNext you are free to throw exception in the ugly way as Reactor can transform them into proper error signals since at that point a Mono is already in progress.

Martin Tarjányi
  • 8,863
  • 2
  • 31
  • 49