2

I'm implementing a DB update approach with retrials.. Following the common pattern for retryWhen() operator as explained here: Using Rx Java retryWhen() ..

..But my retry logic never executes. I'm debugging it and can see the breakpoint hitting at place 3 shown below but it never goes back to retry logic at place 2. After place 3, its always going to place 4 which is the onComplete handler.

(Code is using Java 8 lambdas)

I've applied a workaround by removing the retryWhen() block altogether and now invoking the updateWithRetrials() recursively from subscribe's > onError() block. That is working but I don't like that approach.

Please can anyone suggest what is incorrect when I use retryWhen() operator ?

private void updateWithRetrials(some input x)

{

   AtomicBoolean retryingUpdate = new AtomicBoolean(false);

   ...  

   // 1- Start from here
   Observable.<JsonDocument> just(x).map(x1 -> {

       if (retryingUpdate.get())
       {
          //2. retry logic
       }

       //doing sth with x1 here
       ...
       return <some observable>;

   })
   .retryWhen(attempts -> attempts.flatMap(n -> {

       Throwable cause = n.getThrowable();

       if (cause instanceof <errors of interest>)
       {
          // 3 - break-point hits here

          // retry update in 1 sec again
          retryingUpdate.set(true);
          return Observable.timer(1, TimeUnit.SECONDS);
       }

       // fail in all other cases...
       return Observable.error(n.getThrowable());
   }))
   .subscribe(
          doc -> {
                    //.. update was successful   
                 },

          onError -> {
                    //for unhandled errors in retryWhen() block
                  },

              {
                // 4. onComplete block

                 Sysout("Update() call completed.");
              }

     ); //subscribe ends here

}
Community
  • 1
  • 1
NitinS
  • 322
  • 2
  • 12

2 Answers2

2

Your problem is due to some performance optimisation with Observable.just().

This Operator, after emmiting the item, does not check if the subscribtion is not cancelled and sends onComplete on all cases.

Observable.retryWhen (and retry) resubscribes on Error, but terminates when source sends onComplete.

Thus, even if the retry operator resubscribes, it gets onComplete from previous subscription and stops.

You may see, that code below fails (as yours):

@Test
public void testJustAndRetry() throws Exception {
        AtomicBoolean throwException = new AtomicBoolean(true);
        int value = Observable.just(1).map(v->{
            if( throwException.compareAndSet(true, false) ){
                throw new RuntimeException();
            }
            return v;
        }).retry(1).toBlocking().single();
    }

But if you "don't forget" to check subscription, it Works!:

@Test
public void testCustomJust() throws Exception {
    AtomicBoolean throwException = new AtomicBoolean(true);
    int value = Observable.create((Subscriber<? super Integer> s) -> {
                s.onNext(1);
                if (!s.isUnsubscribed()) {
                    s.onCompleted();
                }
            }
    ).map(v -> {
        if (throwException.compareAndSet(true, false)) {
            throw new RuntimeException();
        }
        return v;
    }).retry(1).toBlocking().single();

    Assert.assertEquals(1, value);
}
Marek Hawrylczak
  • 2,424
  • 15
  • 7
  • This looks like a good approach though my question was related to 'retryWhen'. Thanks for such a nice explanation anyways. – NitinS Apr 03 '15 at 19:02
  • problem affects both functions - retry and retryWhen; I used retry for simplicity – Marek Hawrylczak Apr 08 '15 at 06:55
  • @MarekHawrylczak Thank you for your explaining. Exactly ! So as you said , there is no way to do retryWhen() with just(). – Bulma Dec 18 '18 at 04:24
1

I suppose the error occurs inside map because it cannot occur in just. This is not how retryWhen works.

Implement your observable using create and make sure no errors occur in map. If any error will be thrown in the create block the retryWhen will be called and the unit of work retried depending on your retry logic.

    Observable.create(subscriber -> {
        // code that may throw exceptions
    }).map(item -> { 
        // code that will not throw any exceptions
    }).retryWhen(...)
      ...
tomrozb
  • 25,773
  • 31
  • 101
  • 122
  • Accepting this answer. I was able to see retrials when used the 'create' method. But I observed that if I add a 'subscriber' to the result of 'retryWhen', the onNext and onComlpete are never called on that subscriber even though the actions taken in 'create' were successfully completed after retrials. – NitinS Apr 03 '15 at 19:00