1

Can someone please guide me as I am new to the flux and trying to understand how to handle this scenario?

Issue: I am getting readTimeout exception in one of the flux responses from getResp() method below and then all the prior successful responses are ignored and the exception error is returned.

Instead, I want to return all the successful responses I received prior to the exception.

public Flux<CustomObject1> getInfo(List<RequestObj> requestObjList) {
return requestObjList.stream()
       .parallel()
       .map(this::getResp)
       .reduce(Flux::Merge)
       .orElse(Flux.empty());
}

public Flux<CustomObject1> getResp(RequestObj requestObj){
// process the request and return ...        

}

Please let me know if this is not clear, happy to provide more details.

Uwe Allner
  • 3,399
  • 9
  • 35
  • 49

1 Answers1

1

There are several ways to handle errors in flatMap

Use flatMapDelayError that will delay any error until all elements are processed

public Flux<CustomObject1> getInfo(List<RequestObj> requestObjList) {
    return Flux.fromIterable(requestObjList)
            .flatMapDelayError(this::getResp, Queues.SMALL_BUFFER_SIZE, Queues.XS_BUFFER_SIZE)
            .onErrorResume(e -> {
                // log error
                return Mono.empty();
            });
}

Handle error for every element

public Flux<CustomObject1> getInfo(List<RequestObj> requestObjList) {
    return Flux.fromIterable(requestObjList)
            .flatMap(request -> 
                    getResp(request)
                            .onErrorResume(e -> {
                                // log error
                                return Mono.empty();
                            })
            );

Alex
  • 4,987
  • 1
  • 8
  • 26
  • Thank you Alex for answering my question. For now, this is how I handled... public Flux getInfo(List requestObjList) { return requestObjList.stream() .parallel() .map(this::getResp) .reduce(Flux::Merge) .orElse(Flux.empty()); } public Flux getResp(RequestObj requestObj){ // process the request and return ... } – sushant saini Apr 13 '22 at 08:40
  • from what I see `getResp` returns `Flux` and you can't use java streams to resolve publishers. `.map(this::getResp)` will not execute `getResp` because in reactive "nothing happens until you subscribe". You need to construct flow using Reactor API and use different operators such `flatMap` to resolve publishers. Check my examples for details. – Alex Apr 13 '22 at 14:51