1

I just start learning rxJava for Android and want to implement the common use case:

  • request data from cache and show to the user
  • request data from web
  • server update data in storage and automatically show it to the user

Traditionally on of the best scenarios was use CursorLoader to get data from cache, run web request in the separate thread and save data to the disk via content provider, content provider automatically notify the listener and CursorLoader autoupdate UI.

In rxJava I can do it by running two different Observers as you can see in code below, but I don't find the way how to combine this two calls into the one to reach my aim. Googling shows this thread but it looks like it just get data from the cache or data from the web server, but don't do both RxJava and Cached Data

Code snippet:

@Override
public Observable<SavingsGoals> getCachedSavingsGoal() {
    return observableGoal.getSavingsGoals()
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread());
}

@Override
public Observable<SavingsGoals> getRecentSavingsGoal() {
    return api.getSavingsGoals()
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread());
}

    model.getCachedSavingsGoal().subscribe(new Observer<SavingsGoals>() {
        @Override
        public void onCompleted() {
            // no op
        }

        @Override
        public void onError(Throwable e) {
            Log.e(App.TAG, "Failed to consume cached data");
            view.showError();
        }

        @Override
        public void onNext(SavingsGoals savingsGoals) {
            Log.d(App.TAG, "Show the next item");
            if (savingsGoals != null && !savingsGoals.getSavingsGoals().isEmpty()) {
                view.showData(savingsGoals.getSavingsGoals());
            } else {
                view.showError();
            }
        }
    });

    model.getRecentSavingsGoal().subscribe(new Observer<SavingsGoals>() {
        @Override
        public void onCompleted() {
            // no op
        }

        @Override
        public void onError(Throwable e) {
            Log.e(App.TAG, "Failed to consume data from the web", e);
            view.showError();
        }

        @Override
        public void onNext(SavingsGoals savingsGoals) {
            if (savingsGoals != null && !savingsGoals.getSavingsGoals().isEmpty()) {
                view.showData(savingsGoals.getSavingsGoals());
            } else {
                view.showError();
            }
        }
    });

Also, the one of issues with current approach is cache and web data are not garranted to be run sequently. It is possible when outdated data will come as latest and override recent from web.

To solve this issue I implemented Observer merge with filtration by timestamp: it get data from cache, pass it to the next observer and if cache is outdated fire new call to the web - case for thread competition solved by the filtration with timestamps. However, the issue with this approach I can not return cache data from this Observable - I need to wait when both requests finish their work.

Code snippet.

    @Override
public Observable<Timestamped<SavingsGoals>> getSavingGoals() {
    return observableGoal
            .getTimestampedSavingsGoals()
            .subscribeOn(Schedulers.io())
            .flatMap(new Func1<Timestamped<SavingsGoals>, Observable<Timestamped<SavingsGoals>>>() {
                @Override
                public Observable<Timestamped<SavingsGoals>> call(Timestamped<SavingsGoals> cachedData) {
                    Log.d(App.FLOW, "getTimestampedSavingsGoals");
                    return getGoalsFromBothSources()
                            .filter(filterResponse(cachedData));
                }
            })
            .subscribeOn(AndroidSchedulers.mainThread());
}

private Func1<Timestamped<SavingsGoals>, Boolean> filterResponse(Timestamped<SavingsGoals> cachedData) {
    return new Func1<Timestamped<SavingsGoals>, Boolean>() {
        @Override
        public Boolean call(Timestamped<SavingsGoals> savingsGoals) {
            return savingsGoals != null
                    && cachedData != null
                    && cachedData.getTimestampMillis() < savingsGoals.getTimestampMillis()
                    && savingsGoals.getValue().getSavingsGoals().size() != 0;
        }
    };
}

private Observable<Timestamped<SavingsGoals>> getGoalsFromBothSources() {
    Log.d(App.FLOW, "getGoalsFromBothSources:explicit");
    return Observable.merge(
            observableGoal.getTimestampedSavingsGoals().subscribeOn(Schedulers.io()),
            api.getSavingsGoals()
                    .timestamp()
                    .flatMap(new Func1<Timestamped<SavingsGoals>, Observable<Timestamped<SavingsGoals>>>() {
                        @Override
                        public Observable<Timestamped<SavingsGoals>> call(Timestamped<SavingsGoals> savingsGoals) {
                            Log.d(App.FLOW, "getGoalsFromBothSources:implicit");
                            return observableGoal.saveAllWithTimestamp(savingsGoals.getTimestampMillis(), savingsGoals.getValue().getSavingsGoals());
                        }
                    }))
                    .subscribeOn(Schedulers.io());
}

Do you know the approach to do this in one Observer?

Potential solution:

@Override
public Observable<SavingsGoals> getSavingGoals() {
    return api.getSavingsGoals()
            .publish(network ->
                    Observable.mergeDelayError(
                            observableGoal.getSavingsGoals().takeUntil(network),
                            network.flatMap(new Func1<SavingsGoals, Observable<SavingsGoals>>() {
                                @Override
                                public Observable<SavingsGoals> call(SavingsGoals savingsGoals) {
                                    return observableGoal.saveAll(savingsGoals.getSavingsGoals());
                                }
                            })
                    )
            )
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread());
}
  • Sorry, hot replacement in IDE hide the issue which this approach has: first one in case if network unavailable and cache thread completes first, the error will terminate whole merge (solved by mergeDelayError), second one is in case when cache is empty and returns first data from web request will not be return on subscriber. As you can see my method returns Observable after save and traditional merge as I shown in my code properly handle this case but takeUntil by some reason can not. Question is still open.
Community
  • 1
  • 1
Gleichmut
  • 5,953
  • 5
  • 24
  • 32

2 Answers2

4

For first question : You can save the result from Network Result by using doOnNext Method, It would looks something like this

public Observable<NetworkResponse> getDataFromNetwork(
      final Request request) {
    return networkCall.doOnNext(networkResponse -> saveToStorage(networkResponse);
  }

Now to combine the two results from both Storage and Online, the best way is to combine with publish and merge. I recommend watching this talk. The code would look something like this

  public Observable<Response> getData(final Request request) {

    return dataService.getDataFromNetwork(request)
        .publish(networkResponse ->  Observable.merge(networkResponse, dataService.getDataFromStorage(request).takeUntil(networkResponse)));
  }

Why use publish and merge you my ask? publish method makes the response accessible in the callback. takeUntil means that you will take the data from storage but you will stop it IF for some reason, network call is finished before accessing storage data is finished. This way, you can be sure that new data from network is always shown even if it's finished before getting old data from storage.

The last but not least, in your subscriber OnNext just add the items to the list. (list.clear and list.addAll) Or similar functions or in you case view.showData()

EDIT: For The call getting disrupted when there's an error from network, add onErrorResumeNext at the end.

  public Observable<Response> getData(final Request request) {

    return dataService.getDataFromNetwork(request)
        .publish(networkResponse ->  Observable.merge(networkResponse, dataService.getDataFromStorage(request).takeUntil(networkResponse)))
        .onErrorResumeNext(dataService.getDataFromStorage(request);
  }
Vincent Paing
  • 2,308
  • 3
  • 17
  • 29
  • Vincent, thank you for point out on this methods -- it was the right way! – Gleichmut Nov 25 '16 at 06:04
  • Sorry, hot replacement in IDE hide the issue which this approach has: first one in case if network unavailable and cache thread completes first, the error will terminate whole merge (solved by mergeDelayError), second one is in case when cache is empty and returns first data from web request will not be return on subscriber. As you can see my method returns Observable after save and traditional merge as I shown in my code properly handle this case but takeUntil by some reason can not. Question is still open. – Gleichmut Nov 25 '16 at 07:03
  • Why not move your saving code to doOnNext of network observable instead of saving it in flatMap. flatMap is used to convert an object into another object. Saving function should be on doOnNext as it should be saved only when it's success. – Vincent Paing Nov 25 '16 at 07:40
  • Just found the error in your potential solution code. takeUntil should be after getting the data from the disk. Not after merging two observable. – Vincent Paing Nov 25 '16 at 07:42
  • Thank you regarding point my attention to the proper use of takeUntil! However it is still the issue not calling onNext in case network call has issue (no internet connection). In that case I get an onError call for network command and can not get result from cache (mergeDelayError should solve this issue , but it doesn't Updated code in the question) – Gleichmut Nov 26 '16 at 05:09
  • Thank you, you helped me to reach the aim! – Gleichmut Nov 26 '16 at 11:39
  • Glad to be of help :D – Vincent Paing Nov 26 '16 at 11:43
0

I'd recommend to "listen" only to local data, and refresh it when API response came. Let say for getting local data you have something like:

@Nonnull
public Observable<SomeData> getSomeDataObservable() {
    return Observable
            .defer(new Func0<Observable<SomeData>>() {
                @Override
                public Observable<SomeData> call() {
                    return Observable.just(getSomeData());
                }
            });
}

So you need to add PublishSubject that will emit every time, when local data was updated (refreshSubject):

@Nonnull
public Observable<SomeData> getSomeDataObservableRefreshable() {
    return refreshSubject.startWith((Object)null).switchMap(new Func1() {
        public Observable<T> call(Object o) {
            return getSomeDataObservable();
        }
    }     
}

Now you need to subscribe only to getSomeDataObservableRefreshable(), and each time when data came from API, you update it and make refreshSubject .onNext(new Object())

Also i'd recommend to take a look to rx-java-extensions lib, it has alot of "cool tools" for RxAndroid. For example solution for your problem would be:

@Nonnull
public Observable<SomeData> getSomeDataObservable() {
    return Observable
            .defer(new Func0<Observable<SomeData>>() {
                @Override
                public Observable<SomeData> call() {
                    return Observable.just(getSomeData());
                }
            })
            .compose(MoreOperators.<SomeData>refresh(refreshSubject));
}
borichellow
  • 1,003
  • 11
  • 11
  • Thank you, it is interesting approach with two-way channel as subject. However there are few flaws: first one, client still need to be subscribed to API calls, otherwise framework never triggers the logic till anybody subscribes to this observable; another is client need to pass some fake data into subject to trigger it logic. – Gleichmut Nov 26 '16 at 07:14
  • it's true, so you can subscribe to API call, to display error message, if any error occurs. – borichellow Nov 26 '16 at 11:54