0

I'm trying to create a server call using RxJava2 library that will try to poll server for answer and if receives exception 3 times in a row to return that exception

I've set up a basic call that fetches the response from the server

final Observable<ResponseValue> responseValueObservable = Observable
                .fromCallable((Callable) (c) -> return getDispatcher().performSubmit(submitValue);
                    }
                });

return responseValueObservable
           .retry(3)
           .subscribeOn(Schedulers.io()
           .onError((t) -> { log.error(t); Observable.timer(2, SECONDS);}
           .retryUntil(() -> { return retryIsEnabled }

so getDispatcher().performSubmit(submitValue) returns either SubmitException or ResponseValue object.

I need the code to retry 3 times, pausing after each exception for 2 seconds and return either ResponseValue or the last SubmitException

testing_kate
  • 187
  • 2
  • 13

2 Answers2

0

Use the retryWhen() operator to customize the response to errors. From the excellent overview at Dan Lew's Blog:

responseValueObservable
  .retryWhen( errorObservable -> errorObservable
               .zipWith(Observable.range(1, 3), (n, i) -> i)
               .flatMap(retryCount -> Observable.timer(2, TimeUnit.SECONDS)))
  ...
Bob Dalgleish
  • 8,167
  • 4
  • 32
  • 42
0

So after reading Dan Lew's Blog from previous answer I was able to put together this piece of code which does exactly what I wanted to. retryWhen() on re-subscribes automatically after waiting 2 seconds. With the first successful reply from server it stops.

      Observable.fromCallable((d::performSubmit))
            .subscribeOn(Schedulers.io())
            .doOnSubscribe(subscription -> System.out.println("Subscribing"))
            .retryWhen(errors -> {
                AtomicInteger counter = new AtomicInteger();
                return errors
                        .takeWhile(e -> counter.incrementAndGet() < 3)
                        .flatMap(e -> {
                            System.out.println("delay retry by 2 second(s)");
                            return Observable.timer(2, TimeUnit.SECONDS);
                        });
            }).blockingSubscribe(res -> result = Optional.of(res), throwable -> t = Optional.of((Exception) throwable));
testing_kate
  • 187
  • 2
  • 13