0

I am using the new Couchbase Java Client API 2.1.1 and therefore JavaRx to access my Couchbase cluster.
When using asynchronous getAndLock on an already locked document, getAndLock fails with a TemporaryLockFailureException. In another SO question (rxjava: Can I use retry() but with delay?) I found out how to retry with delay.

Here is my adopted code:

    CountDownLatchWithResultData<JsonDocument> resultCdl = new CountDownLatchWithResultData<>(1);

    couchbaseBucket.async().getAndLock(key, LOCK_TIME).retryWhen((errorObserver) -> {
        return errorObserver.flatMap((Throwable t) -> {
            if (t instanceof TemporaryLockFailureException) {
                return Observable.timer(RETRY_DELAY_MS, TimeUnit.MILLISECONDS);
            }
            return Observable.error(t);
        });
    }).subscribe(new Subscriber<JsonDocument>() {

        @Override
        public void onCompleted() {
            resultCdl.countDown();
        }

        @Override
        public void onError(Throwable e) {
            resultCdl.countDown();
        }

        @Override
        public void onNext(JsonDocument t) {
            resultCdl.setInformation(t);
        }

    });

    ........

    resultCdl.await();

    if (resultCdl.getInformation() == null) {
        //do stuff
    } else ....

(CountDownLatchWithResultData simply extends a normal CountDownLatch and adds two methods to store some information before the count has reached 0 and retrieve it afterwards)

So basically I'd like this code to

  • try to get the lock infinitely once every RETRY_DELAY_MS milliseconds if a TemporaryLockFailureException occured and then call onNext
  • or to fail completely on other exceptions
  • or to directly call onNext if there is no exception at all

The problem now is that when retrying, it only retries once and the JsonDocument from resultCdl.getInformation() is always null in this case even though the document exists. It seems onNext is never called.
If there is no exception, the code works fine.
So apparently I am doing something wrong here but I have no clue as to where the problem might be. Does returning Observable.timer imply that with this new Obervable also the previously associated retryWhen is executed again? Is it the CountDownLatch with count 1 getting in the way?

Community
  • 1
  • 1
Sebastian
  • 5,177
  • 4
  • 30
  • 47
  • I try to simulate your issue without couchbase, and only with plain Observable which emits value or errors. I didn't reproduce it. Have you put a breakpoint into onNext() to check if it's called ? – dwursteisen Apr 02 '15 at 14:46

1 Answers1

0

This one is subtle. Up to version 2.2.0, the Observables from the SDK are in the "hot" category. In effect that means that even if no subscription is made, they start emitting. They will also emit the same data to every newcoming Subscriber, so in effect they cache the data.

So what you retry does is resubscribe to an Observable that will always emit the same thing (in this case an error). I suspect it comes out of the retry loop just because the lock maximum duration is LOCK_TIME...

Try wrapping the call to asyncBucket.getAndLock inside an Observable.defer (or migrate to the 2.2.x SDK if that's something you could do, see release and migration notes starting from 2.2.0).

Simon Baslé
  • 27,105
  • 5
  • 69
  • 70