6

Let's say I have an array of ids: [9, 8, 7, 6].

I do some processing and one element causes to throw an exception. I want to handle this situation on my own way (let's say log it) and let the other elements go with the flow.

How can I know which one was it? I need to have this element in my onError processing.

Flux.fromArray(myArray)
  .flatMap(element -> {
    var foo = processMyEl(element);  
    return anotherProcess(foo); // this returns Mono
  })
  .onErrorOperator(element -> handleMyError(element)) // this line is what I need
  

So, what I saw, there's this almost nice .onErrorContinue((error, obj) -> that emits an error and an object.

But this obj is not the element that caused the exception but the object that did so. It happens inside of my processing methods and it doesn't have to be the same type of object every time.

.onErrorReturn(...) - not really what I want

.doOnError(error -> - no information of my element

.onErrorResume(error -> - same as above

there were suggestions that I can create my own Exception and pass there the element and then retrieve it from the exception. But how should I throw the exception?

Should I go with an old way of try catch:

Flux.fromArray(myArray)
  .flatMap(el -> {
    try {
      var foo = processMyEl(el);  
      return anotherProcess(foo); // this returns Mono
    } catch (Exception e) {
      return Mono.error(new MyException(el));
     }
    })
  .onErrorOperator(error -> handleMyError(error.getElement()))

It doesn't look well

Edit:

Not only it looks bad, but also doesn't work. The exception is not caught at all and triggers directly doOnTerminate() and stops the whole stream

Update:

Thanks to @JEY I used .onErrorResume() inside flatMap.

I also transformed first method to be a reactive stream by Mono.defer(() -> Mono.just(processMyEl(el))).

Just as a note: using Mono.defer() allows me to use onErrorResume since Mono.just() cannot signal errors.

Final code looks like this:

Flux.fromArray(myArray)
    .flatMap(element -> Mono.defer(() -> Mono.just(processMyEl(element)))
        .onErrorResume(th -> handleMyError(element, th))
    )
    .flatMap(foo -> anotherProcess(foo)
        .onErrorResume(th -> handleMyError(foo, th)
    )

Where:

private Mono<> handleMyError(el, th) {
  // handling code
  return Mono.empty()
}
Kamil
  • 1,456
  • 4
  • 32
  • 50
  • You should go with you old try-catch but don't return Mono of error handle your error and return Mono.empty() so that your error won't be seen down stream and flux will keep getting consume. – JEY Dec 01 '20 at 10:42
  • yeah, I tried to have my logic in catch clause but the code doesn't even reach there, just terminates my stream and that's all – Kamil Dec 01 '20 at 12:55
  • if it doesn't reach it, it means that the error is thrown somewhere else. Without more info we won't be able to help. – JEY Dec 01 '20 at 13:00
  • no, it's being thrown inside one of those two methods, but since one method returns `Mono` - as I mentioned in the question, and the exception can be thrown there, it's not going to be caught by try catch since it's not in its scope anymore – Kamil Dec 01 '20 at 13:21
  • don't use a try-catch then but a anotherProcess.onErrorResume(th -> { LOG.error("error", th); return Mono.emtpy();}) – JEY Dec 01 '20 at 13:31
  • and how can I get my element there? – Kamil Dec 01 '20 at 13:35
  • You have direct access to it anotherProcess.onErrorResume(th -> { LOG.error("error with element", element, th); return Mono.emtpy();}) or anotherProcess.onErrorResume(th -> { LOG.error("error with element", element, th); return handleMyError(element).then();}) – JEY Dec 01 '20 at 13:45
  • 1
    you're right! what I also did was to get rid of try catch entirely and transform to reactive stream by `Mono.just(processMyEl(el))` and then apply `onErrorResume` like you suggested to both of them, feel free to post an answer and I'll accept it; I'll also post my final code – Kamil Dec 01 '20 at 14:41
  • alright then glad to help. – JEY Dec 01 '20 at 14:49

2 Answers2

3

As requested by @Kamil I'll add my comments as an answer:

You should just handle the error in the flatMap and return a Mono.empty() to discard it do something like:

Flux.fromArray(myArray)
  .flatMap(el -> anotherProcess(processMyEl(el)).onErrorResume(th -> handleError(th, el))

With handle error like:

Mono<Void> handleError(Throwable th, Object element) {
    LOG.error("An error occurred on {}", element, th);
    return Mono.empty()
}

Or if you want to do something more complex that require async:

Mono<Void> handleError(Throwable th, Object element) {
    return doSomethingThaReturnFluxOrMono(element).then();
}
JEY
  • 6,973
  • 1
  • 36
  • 51
-1
} catch (Exception e) {
    throw new MyException(el, e);
}
Alexei Kaigorodov
  • 13,189
  • 1
  • 21
  • 38
  • I cannot throw an exception in catch clause in a flatMap, having `return Mono.error(new MyException(el))` or `return Flux.error(new MyException(el))` is the right way to signal an exception down the stream – Kamil Dec 01 '20 at 12:52