Consider the following code:
@Slf4j
@ExtendWith(MockitoExtension.class)
class ConnectionEventsConsumerTest {
@Test
public void testOnErrorResume() {
Flux.range(0, 5)
.doOnNext(event -> log.info("Processing - {}", event))
.flatMap(event -> processEvent(event)
.doOnSuccess(result -> log.info("Processed - {}", event))
.onErrorResume(t -> handleError(t, event))
)
.doOnError(t -> log.error("Exception propagated", t))
//.log()
.then()
.subscribe();
}
private Mono<Void> processEvent(Object object) {
return Mono.error(() -> new RuntimeException("test"));
//throw new RuntimeException("test");
}
private Mono<Void> handleError(Throwable throwable, Object object) {
log.error("Processing Failed - {}", object);
return Mono.empty();
}
}
The output is completely different if the method processEvent returns a Mono.error than if it throws an Exception.
The code as it is (returning a Mono.error), I see what I expected, 300 iterations of Processing and Processins Failed, and I see no Exception propagated.
17:33:19.853 [main] INFO com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
- Processing - 0 17:33:19.864 [main] ERROR com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
- Processing Failed - 0 17:33:19.865 [main] INFO com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
- Processing - 1 17:33:19.866 [main] ERROR com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
- Processing Failed - 1 17:33:19.866 [main] INFO com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
- Processing - 2 17:33:19.866 [main] ERROR com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
- Processing Failed - 2 17:33:19.866 [main] INFO com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
- Processing - 3 17:33:19.866 [main] ERROR com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
- Processing Failed - 3 17:33:19.866 [main] INFO com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
- Processing - 4 17:33:19.866 [main] ERROR com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
- Processing Failed - 4
On the other hand, if I uncomment the throw, I see a single item from the Flux being processed, I do not see the message from handleError and I see the "Exception Propagated"
17:35:53.950 [main] INFO com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
- Processing - 0 17:35:53.968 [main] ERROR com.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEventsConsumerTest
- Exception propagated java.lang.RuntimeException: test
If this is by design, what are the best practices for the flatMap? On easy solution that comes to mind is to surround th content of the flatMap by a try-catch to wrap the exception in a Mono.error. While it works, it is inelegant and too manual, likely to be forgotten.