2

Tech stack: rx-java 1.1.x, retrofit 1.9.x, spring 4.3.x.

A bit of context: I'm pretty new to rx-java. My service A has an API endpoint that makes a search call to service B that is frequently used, and fails a bit too often then it should. Some of the errors are pretty clear timeouts from other different services deep down, that took >30s, but quite a lot of them are pretty quick ones, around <1s.

What exactly I'm trying to do: Retry only the calls that fail under given threshold (let's say <1s), ideally the ones returning 5xx HTTP code responses.

Ideas that came to my mind, but do not solve the problem: Regular Observable.timeout() seems of no use, because for now I don't want to touch (interrupt) calls that are taking longer. I only want to retry those that came back as failed (5XX response), not interrupt the longer ones. retry() seems of no use, because I don't want to simply retry every failed call. retryWhen() could be of use, but I am not sure how can I extract the HTTP from a Throwable and what exactly should I measure in the Observable call.

Code:

@RestController
@RequestMapping(...)
public class MyController {

@RequestMapping(method = GET)
public DeferredResult<MyJsonWrapper> fetchSomething() {
    MySearchRequest searchRequest, 
    BindingResult bindingResult, 
    HttpServletRequest request) {

    return new MyDeferredResult(
        serviceB.searchSomething(...)
        .doOnNext( result -> /* log size of search */ ));
    }

serviceB.searchSomething(...) also returns Observable<MyJsonWrapper>

What is MyDeferredResult:

class MyDeferredResult<T> extends DeferredResult<T> {

    public MyDeferredResult(Observable<T> observable) {

        onTimeout(this::handleTimeout);
        ConnectableObservable<T> publication = observable.publish();
        publication.subscribe(this::onNext, this::onError, this::onCompleted);
        publication.connect(subscription -> this.subscription = subscription);
    }

    (...)
    private void handleTimeout() {
        setErrorResult(new MyTimeoutException( /* some info about request */ ));
        subscription.unsubscribe();
    }

How can I retry only the requests that failed under 1s that are 5xx HTTP responses?

Eel Lee
  • 3,513
  • 2
  • 31
  • 49

2 Answers2

1

I have been able to implement a working solution. To measure the Observable's time I chose Spring's StopWatch, started counting in doOnSubscribe() and stopped in doOnTerminate().

I create the StopWatch and pass it to my custom retry function used in retryWhen(), and only when the code goes to the retryWhen() block I check if the time was under my given threshold.

How my call looks like now:

StopWatch stopWatch = new StopWatch();
int executionTimeThresholdMillis = 1000; // 1 second

return new MyDeferredResult(
    serviceB.searchSomething(...)
    .doOnSubscribe(stopWatch::start)
    .doOnTerminate(stopWatch::stop)
    .retryWhen(
        return RetryGivenHttpResponsesUnderThreshold.builder()
            .maxRetries(MAX_RETRIES)
            .httpResponsesToRetry(List.of(HTTP_CODE_TO_FAIL))
            .observableExecutionTime(stopWatch)
            .executionTimeThresholdMillis(executionTimeThresholdMillis)
            .build())
    .doOnNext( result -> /* log size of search */ ));
}

Now, the example of how could you implement the retry function. I want both checking the HTTP response and elapsed time, so the code is only somehow configurable. I hope someone else will also use it and then change it accordingly to one's needs:

public class RetryGivenHttpResponsesUnderThreshold implements Func1<Observable<? extends Throwable>, Observable<?>> {


private final int maxRetries;
private final Collection<Integer> httpResponsesToRetry;
private int retryCount;

private final boolean isMeasurable;
private final long maxObservableExecutionTimeMilis;
private final StopWatch stopWatch;

(...) 
// constructors, builders, validations...

@Override
public Observable<?> call(final Observable<? extends Throwable> attempts) {
    return attempts
            .flatMap(throwable -> {
                boolean needsRetry = false;
                if (throwable instanceof HttpException) {
                    if (httpResponsesToRetry.contains(((HttpException) throwable).code())) {

                        // !IMPORTANT! in my case I want to get getLastTaskTimeMillis(), and NOT getTotalTimeMillis()
                        // because the timer will be stopped on every error that will trigger retry
                        final long observableExecutionTimeMilis = stopWatch.getLastTaskTimeMillis();

                        if (isMeasurable) {
                            needsRetry = observableExecutionTimeMilis <= maxObservableExecutionTimeMilis;
                        } else  {
                            needsRetry = true;
                        }
                    }
                }

                if (needsRetry && retryCount < maxRetries) {
                    retryCount++;
                    // Simply retry.
                    return Observable.just(null);
                }

                // Just pass the error along.
                return Observable.error(throwable);
            });
}

}

Eel Lee
  • 3,513
  • 2
  • 31
  • 49
0

A very simple retry example when the response status is 500. You can include the multiple conditions along with the status code(500) and where it throw IOException so that the retry feature is start to execute. Assume the true value for time thresold is THRESOLD

public void retriesFor500HttpResponse() throws Exception {
        try (CloseableHttpClient httpClient = HttpClients.custom()
          .addInterceptorLast(new HttpResponseInterceptor() {
              @Override
              public void process(HttpResponse response, HttpContext context) throws HttpException, IOException {
                  if (response.getStatusLine().getStatusCode() == 500 && THRESOLD) {
                      throw new IOException("Retry it");
                  }
              }
          })
          .build()) {

            executeFor500Status(httpClient, STATUS_500_URL);
        }
    }

    private void executeFor500Status(CloseableHttpClient httpClient, String url) throws IOException {

        //....
        
    }
Jimmy
  • 995
  • 9
  • 18
  • Thank you for the effort to reply, but unfortunately this doesn't answer my question. I need to somehow measure a call that returns Observable and invoke `retryWhen()` and the function in the retryWhen() needs to have the information how long actually did the call took. Your answer doesn't provide a solution for that. It focuses only on the error code, not measuring the time and passing it. Anyway, thank you for the effort! – Eel Lee Jul 26 '22 at 09:24