0

I have Observable<FeaturedItemList> getFeatured() that is called everytime the page opened. This function is called from two different components on the same page. Since it retrieves from the network, I cached it and make it shareable with ReplaySubject.

public Observable<FeaturedItemList> getFeatured() {
    if(mFeaturedReplaySubject == null) {
        mFeaturedReplaySubject = ReplaySubject.create();
        getFromNetwork().subscribe(mFeaturedReplaySubject);
    }

    return mFeaturedReplaySubject;
}

Then I realize that when the request failed for some reasons, if the user come back to that page it will not show any results unless the user killed the app. So I decided to have some retry logic. Here's what I do:

public Observable<FeaturedItemList> getFeatured() {
    synchronized (this) {
        if (mFeaturedReplaySubject == null) {
            mFeaturedReplaySubject = ReplaySubject.create();
            getFromNetwork().subscribe(mFeaturedReplaySubject);

            return mFeaturedReplaySubject;
        } else {
            return mFeaturedReplaySubject.onErrorResumeNext(throwable -> {
                mFeaturedReplaySubject = null;
                return getFeatured();
            });
        }
    }
}

While this works, I'm afraid I'm doing something not good here on there's a case that won't be covered with this approach. Is there any better approach?

Also for sharing the observable using subject, I read somewhere that I can use connect(), publish(), and share() but I'm not sure how to use it.

Fadli
  • 976
  • 9
  • 24

1 Answers1

0

The code

private Observable<FeaturedItemList> mFeatured =
    // initialized on construction
    getFromNetwork()
        .retry(3) // number of times to retry
        .cache();

public Observable<FeaturedItemList> getFeatured() {
    return mFeatured;
}

Explanation

Network source

Your getFromNetwork() function shall return regular observable, which is supposed to access network every time it is subscribed. It shall not access network when it is invoked. For example:

Future<FeaturedItemList> makeAsyncNetworkRequest() {
    ... initiate network request here ...
}

Observable<FeaturedItemList> getFromNetwork() {
    return Observable.fromCallable(this::makeAsyncNetworkRequest)
        .flatMap(Observable::fromFuture);
}

Retry

There is a family of .retryXxx() operators, which get activated on errors only. They either re-subscribe to source or pass error down the line, subject to various conditions. In case of no error these operators do nothing. I used simple retry(count) in my example, it will retry specified number of times without delay. You may add a delay or whatever complicated logic using retryWhen() (see here and here for examples).

Cache

cache() operator records the sequence of events and replays it to all new subscribers. The bad thing is that it is not refreshable. It stores the outcome of upstream forever, whether it is success or error, and never retries again.

Alternative to cache()

replay().refCount() replays events to all existing subscribers, but forgets everything as soon as all of them unsubscribe (or complete). It will re-subscribe to getFromNetwork() when new subscriber(s) arrive (with retry on error of course).

Operators mentioned but not needed

share() is a shorthand for publish().refCount(). It allows multiple concurrent subscribers to share single subscription, i.e. makes single call to subscribe() instead of doing it for every subscriber. Both cache() and replay().refCount() incorporate same functionality.

Community
  • 1
  • 1
Yaroslav Stavnichiy
  • 20,738
  • 6
  • 52
  • 55
  • `getFeatured()` will be called from two places ended up calling `subscribe()`. Since Observable is immutable I guess I have to make instance variable? like I did to the Subject? `mFeaturedObs = getFromNetwork().cache().share()` Or `share()` actually doing that? – Fadli Jan 10 '17 at 06:32
  • As for `retryWhen()`, `getFromNetwork()` should be re-tried only when the current instance of activity call `getFeatured()` for the first time, since I don't want to retry every time the function get called, just when the user opens the activity. But I assume that can't be done without using some kind of boolean variable, right? – Fadli Jan 10 '17 at 06:33
  • Thanks for the additional explanation! Also for bringing up `replay().refCount()`. Have a feeling I'm gonna use this instead of `cache()` – Fadli Jan 11 '17 at 03:08