3

I have an API that takes an Observable that triggers an event.

I want to return an Observable that emits a value every defaultDelay seconds if an Internet connection is detected, and delays numberOfFailedAttempts^2 times if there's no connection.

I've tried a bunch of various styles, the biggest problem I'm having is retryWhen's observable is only evaluated once:

Observable
    .interval(defaultDelay,TimeUnit.MILLISECONDS)
    .observeOn(Schedulers.io())
    .repeatWhen((observable) ->
         observable.concatMap(repeatObservable -> {
             if(internetConnectionDetector.isInternetConnected()){
                 consecutiveRetries = 0;
                 return observable;
             } else {
                 consecutiveRetries++;
                 int backoffDelay = (int)Math.pow(consecutiveRetries,2);
                 return observable.delay(backoffDelay, TimeUnit.SECONDS);
                }
         }).onBackpressureDrop())
    .onBackpressureDrop();

Is there any way to do what I'm attempting to do? I found a related question (can't find it searching right now), but the approach taken didn't seem to work with a dynamic value.

Maksim Ostrovidov
  • 10,720
  • 8
  • 42
  • 57
Selali Adobor
  • 2,060
  • 18
  • 30

3 Answers3

6

In your code there are two mistakes:

  1. In order to repeat some observable sequence, that sequence has to be finite. I.e. instead of interval you'd better use something like just, or fromCallable as I did in sample below.
  2. From repeatWhen's inner function you need to return new delayed observable source, so instead of observable.delay() you have to return Observable.timer().

Working code:

public void testRepeat() throws InterruptedException {
    logger.info("test start");

    int DEFAULT_DELAY = 100; // ms
    int ADDITIONAL_DELAY = 100; // ms
    AtomicInteger generator = new AtomicInteger(0);
    AtomicBoolean connectionAlive = new AtomicBoolean(true); // initially alive

    Disposable subscription = Observable.fromCallable(generator::incrementAndGet)
            .repeatWhen(counts -> {
                AtomicInteger retryCounter = new AtomicInteger(0);
                return counts.flatMap(c -> {
                    int retry = 0;
                    if (connectionAlive.get()) {
                        retryCounter.set(0); // reset counter
                    } else {
                        retry = retryCounter.incrementAndGet();
                    }
                    int additionalDelay = ADDITIONAL_DELAY * (int) Math.pow(retry, 2);
                    logger.info("retry={}, additionalDelay={}ms", retry, additionalDelay);
                    return Observable.timer(DEFAULT_DELAY + additionalDelay, TimeUnit.MILLISECONDS);
                });
            })
            .subscribe(v -> logger.info("got {}", v));

    Thread.sleep(220);
    logger.info("connection dropped");
    connectionAlive.set(false);
    Thread.sleep(2000);
    logger.info("connection is back alive");
    connectionAlive.set(true);
    Thread.sleep(2000);
    subscription.dispose();
    logger.info("test complete");
}

See detailed article about repeatWhen here.

Yaroslav Stavnichiy
  • 20,738
  • 6
  • 52
  • 55
  • The sample in the question was probably from the middle of my attempts since it seems to mix two approaches I used (one timer + retry based, one interval + delayed subscription based), the problem was actually from that article, which says that the input observable to retry/repeat should be used again. Does not using that observable cause issues with leaking subscriptions? – Selali Adobor Jan 17 '17 at 14:05
  • 1
    @AssortedTrailmix That was about the first level input, not about inner `flatMap`. See the last example in that article for very similar pattern. – Yaroslav Stavnichiy Jan 17 '17 at 14:30
  • ohh I see, sorry I missed that `counts` was what was getting flatMap'ed – Selali Adobor Jan 17 '17 at 14:57
2

You can use the retryWhen operator to configure the delay when there's no connection. How to periodically emit items is a separate topic (look up interval or timer operators). Open a separate question if you can't figure it out.

I've an extensive example on my Github, but I'll give you the gist here.

RetryWithDelay retryWithDelay = RetryWithDelay.builder()
    .retryDelayStrategy(RetryDelayStrategy.RETRY_COUNT)
    .build()

Single.fromCallable(() -> {
    ...
}).retryWhen(retryWithDelay)
.subscribe(j -> {
    ...
})

RetryWithDelay is defined as follows. I used RxJava 2.x, so if you're using 1.x, the signature should be Func1<Observable<? extends Throwable>, Observable<Object>>.

public class RetryWithDelay implements
        Function<Flowable<? extends Throwable>, Publisher<Object>> {
    ...
}

RetryWithDelay class.

RetryStrategy enum.

This allows me to configure various sorts of timeouts, constant, linear, exponential, based on the RetryDelayStrategy. For your use case, you'd choose CONSTANT_DELAY_TIMES_RETRY_COUNT delay strategy and call retryDelaySeconds(2) when building RetryWithDelay.

retryWhen is a complicated, perhaps even buggy, operator. Most examples you'll find online use the range operator, which will fail if there are no retries to be made. See my answer here for details.

Homayoon Ahmadi
  • 1,181
  • 1
  • 12
  • 24
Abhijit Sarkar
  • 21,927
  • 20
  • 110
  • 219
2

I've always found retryWhen to be somewhat low-level so for exponential backoff I use a a builder (like Abhijit) that is unit tested and available for RxJava 1.x at rxjava-extras. I'd suggest using a capped version so that the exponential increase of delay won't go beyond a maximum value you define.

This is how you use it:

observable.retryWhen(
    RetryWhen.exponentialBackoff(
        delay, maxDelay, TimeUNIT.SECONDS)
    .build());

I disagree that retryWhen is buggy but if you find a bug report it to RxJava. Bugs are fixed fast!

You'll need rxjava-extras 0.8.0.6 or later which is on Maven Central:

<dependency>
    <groupId>com.github.davidmoten</groupId>
    <artifactId>rxjava-extras</artifactId>
    <version>0.8.0.6</version>
</dependency>

Let me know if you need the RxJava 2.x version. The same functionality is available in rxjava2-extras from 0.1.4.

Dave Moten
  • 11,957
  • 2
  • 40
  • 47
  • I knew I'd seen this somewhere! I don't want to reinvent the wheel here so I'm probably going to go with this and look at the implementation to see how I should have went about it – Selali Adobor Jan 17 '17 at 14:06
  • Today I noticed I forgot to implement the max back off, but it looks like that method signature doesn't exist for me when using version 0.8.0.6 – Selali Adobor Jan 20 '17 at 19:15
  • 1
    I was in a hurry the other day so I told myself I'd come back to the piece of code I needed it for, it seems this solution doesn't have the behavior I expected, and that's the retry should reset after a successful invocation (which makes sense since that requires "out-of-band" communication). I think the repeatWhen approach outlined below is what I need in my current situation, this approach seems optimized for the more common situation of "retry until it works" vs "retry always, and delay longer if it doesn't work" – Selali Adobor Jan 20 '17 at 20:14
  • What about `repeat ().retryWhen()` to meet your retry always requirement? – Dave Moten Jan 25 '17 at 11:38
  • That's what I went with based on the answer with the retryWhen observable + flatMap. I think the two things I was missing were to flatMap on the retryWhen observable instead of just returning a new one, and to *not* return the source observable from inside flatMap (that caused a subtle memory leak and didn't work correctly) – Selali Adobor Jan 25 '17 at 14:57