3

Using spring reactive WebClient, I consume an API and in case of response with 500 status I need to retry with exponential backoff. But in Mono class, I don't see any retryBackoff with Predicate as input parameter.

This is the kind of function I search for:

public final Mono<T> retryBackoff(Predicate<? super Throwable> retryMatcher, long numRetries, Duration firstBackoff)

Right now my implementation is as following (I don't have retry with backOff mechanism):

client.sendRequest()
    .retry(e -> ((RestClientException) e).getStatus() == 500)
    .subscribe();
dane131
  • 187
  • 5
  • 16

5 Answers5

4

You might want to have a look at the reactor-extra module in the reactor-addons project. In Maven you can do:

<dependency>
    <groupId>io.projectreactor.addons</groupId>
    <artifactId>reactor-extra</artifactId>
    <version>3.2.3.RELEASE</version>
</dependency>

And then use it like this:

client.post()
    .syncBody("test")
    .retrieve()
    .bodyToMono(String.class)
    .retryWhen(Retry.onlyIf(ctx -> ctx.exception() instanceof RestClientException)
                    .exponentialBackoff(firstBackoff, maxBackoff)
                    .retryMax(maxRetries))
Johan
  • 37,479
  • 32
  • 149
  • 237
4

Retry.onlyIf is now deprecated/removed.

If anyone is interested in the up-to-date solution:

client.post()
      .syncBody("test")
      .retrieve()
      .bodyToMono(String.class)
      .retryWhen(Retry.backoff(maxRetries, minBackoff).filter(ctx -> {
          return ctx.exception() instanceof RestClientException && ctx.exception().statusCode == 500; 
      }))

It's worth mentioning that retryWhen wraps the source exception into the RetryExhaustedException. If you want to 'restore' the source exception you can use the reactor.core.Exceptions util:

.onErrorResume(throwable -> {
    if (Exceptions.isRetryExhausted(throwable)) {
        throwable = throwable.getCause();
    }
    return Mono.error(throwable);
})
pixel
  • 24,905
  • 36
  • 149
  • 251
Lukasz Blasiak
  • 642
  • 2
  • 10
  • 16
0

I'm not sure, what spring version you are using, in 2.1.4 I have this:

client.post()
    .syncBody("test")
    .retrieve()
    .bodyToMono(String.class)
    .retryBackoff(numretries, firstBackoff, maxBackoff, jitterFactor);

... so that's exactly what you want, right?

Frischling
  • 2,100
  • 14
  • 34
0

I'm currently trying it with Kotlin Coroutines + Spring WebFlux:

It seems the following is not working:

suspend fun ClientResponse.asResponse(): ServerResponse =
    status(statusCode())
        .headers { headerConsumer -> headerConsumer.addAll(headers().asHttpHeaders()) }
        .body(bodyToMono(DataBuffer::class.java), DataBuffer::class.java)
        .retryWhen { 
            Retry.onlyIf { ctx: RetryContext<Throwable> -> (ctx.exception() as? WebClientResponseException)?.statusCode in retryableErrorCodes }
                .exponentialBackoff(ofSeconds(1), ofSeconds(5))
                .retryMax(3)
                .doOnRetry { log.error("Retry for {}", it.exception()) }
        )
        .awaitSingle()
Robert W.
  • 330
  • 3
  • 11
0
AtomicInteger errorCount = new AtomicInteger();
Flux<String> flux =
Flux.<String>error(new IllegalStateException("boom"))
        .doOnError(e -> { 
            errorCount.incrementAndGet();
            System.out.println(e + " at " + LocalTime.now());
        })
        .retryWhen(Retry
                .backoff(3, Duration.ofMillis(100)).jitter(0d) 
                .doAfterRetry(rs -> System.out.println("retried at " + LocalTime.now() + ", attempt " + rs.totalRetries())) 
                .onRetryExhaustedThrow((spec, rs) -> rs.failure()) 
        );

We will log the time of errors emitted by the source and count them.

We configure an exponential backoff retry with at most 3 attempts and no jitter.

We also log the time at which the retry happens, and the retry attempt number (starting from 0).

By default, an Exceptions.retryExhausted exception would be thrown, with the last failure() as a cause. Here we customize that to directly emit the cause as onError.

Brian Brix
  • 449
  • 4
  • 12