4

I have a spring webclient making http calls to an external service and backed by reactive circuit breaker factory (resilience4J impl). WebClient and circuit breaker behave as expected when the client establishes connection and fails on response (Any internal server or 4XX errors). However, if the client fails to establish connection, either Connection Refused or UnknownHost, it starts to break down.

  1. I cannot seem to catch the error message within the webclient and trigger circuit breaker.
  2. Circuit breaker never opens and throws TimeoutException.
    java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 1000ms in 'circuitBreaker' (and no fallback has been configured) .

Error from Web client.
io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: localhost/127.0.0.1:9000 .

Here's my code. I have pasted the error origins as well. I have tried to map ConnectException to my custom exception for circuit breaker to pick up but, it did not work. Can someone help me on handling errors without the response from remote server?

 public Mono<String> toSink(
  Envelope envelope, ConsumerConfiguration webClientConfiguration) {

return getWebClient()
    .post()
    .uri(
        uriBuilder -> {
          if (webClientConfiguration.getPort() != null) {
            uriBuilder.port(webClientConfiguration.getPort());
          }
          return uriBuilder.path(webClientConfiguration.getServiceURL()).build();
        })
    .headers(
        httpHeaders ->
            webClientConfiguration.getHttpHeaders().forEach((k, v) -> httpHeaders.add(k, v)))
    .bodyValue(envelope.toString())
    .retrieve()
    .bodyToMono(Map.class)
    // Convert 5XX internal server error and throw CB exception
    .onErrorResume(
        throwable -> {
          log.error("Inside the error resume callback of webclient {}", throwable.toString());
          if (throwable instanceof WebClientResponseException) {
            WebClientResponseException r = (WebClientResponseException) throwable;
            if (r.getStatusCode().is5xxServerError()) {
              return Mono.error(new CircuitBreakerOpenException());
            }
          }
          return Mono.error(new CircuitBreakerOpenException());
        })
    .map(
        map -> {
          log.info("Response map:{}", Any.wrap(map).toString());
          return Status.SUCCESS.name();
        })
    .transform(
        it -> {
          ReactiveCircuitBreaker rcb =
              reactiveCircuitBreakerFactory.create(
                  webClientConfiguration.getCircuitBreakerId());
          return rcb.run(
              it,
              throwable -> {
                /// "Did not observe any item or terminal signal within 1000ms.. " <--- Error here
                log.info("throwable in CB {}", throwable.toString());
                if (throwable instanceof CygnusBusinessException) {
                  return Mono.error(throwable);
                }
                return Mono.error(
                    new CircuitBreakerOpenException(
                        throwable, new CygnusContext(), null, null, null));
              });
        })
    ///io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: localhost/127.0.0.1:9000  <-- Error prints here    
    .onErrorContinue((throwable, o) -> log.error(throwable.toString()))
    .doOnError(throwable -> log.error("error from webclient:{}", throwable.toString()));

}

Pavan Kumar
  • 157
  • 2
  • 4
  • 17

2 Answers2

2

I would make the following suggestions on your solution:

1- There is an alternate variation of onErrorContinue which accepts a predicate so you can define which exceptions this operator will be applied to - Docs

2- Return a Mono.error instead of throwing RuntimeExceptions from Mono/Flux operators. This other stackoverflow answer covers this quite well - Stackoverflow

3- Perform logging with side effect operators (doOn*)

.doOnError(throwable -> log.info("throwable => {}", throwable.toString()))
.onErrorResume(throwable -> throwable instanceof ReadTimeoutException || throwable instanceof ConnectException,
                       t -> Mono.error(new CircuitBreakerOpenException()))

Hope this is helpful.

Michael McFadyen
  • 2,675
  • 11
  • 25
  • 1
    i applied this solution and though it works, I had some side effects with Circuit breaker. I still get the following TimeoutException ```java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 1000ms in 'circuitBreaker' (and no fallback has been configured)```. Also, any exceptions that gets thrown before making the request (ex- Incorrect body for POST) is not getting caught. – Pavan Kumar Jan 23 '21 at 23:19
1

I fixed it by adding an onErrorContinue block and re-throwing the exception as a custom that gets handled in my circuit breaker code.

.onErrorContinue(
        (throwable, o) -> {
          log.info("throwable => {}", throwable.toString());
          if (throwable instanceof ReadTimeoutException || throwable instanceof ConnectException) {
            throw new CircuitBreakerOpenException();
          }
        })
Pavan Kumar
  • 157
  • 2
  • 4
  • 17