7

I have an Observable who reads data from a database. If data is null I need to get it from the network. So I do flatMap on first Observable, check the result of the database operation and if it is null I start that another Observable to fetch data from the network.

Note: Observables have different Subscribers because I have different postprocessing depending on where data comes from (such a logic).

 Observable.just(readDataFromDb()).flatMap(new Func1<SomeData, Observable<String>>() {
        @Override public Observable<SomeData> call(SomeData s) {
          if (s == null) {
            getReadFromNetworkObservable().subscribe(new AnotherSubscriber()); // this one might not complete
            return Observable.empty(); // I think I need to send this one only after readFromNetwork() completed
          } else {
            return Observable.just(s);
          }
        }
      }).subscribe(new SomeSubscirber());

Given I send Observable.empty() to exclude data processing for SomeSubscriber, I have a foreboding my second Observable can not always be finished because it might be simply garbage collected. I guess I saw it during my tests.

At this point, I think I just need to wait until Observable who reads from the network completed and then send Observable.empty(). So can I make the execution synchronous? But still I have a feeling I do it wrong.

Eugene
  • 59,186
  • 91
  • 226
  • 333
  • 1
    Check this answer https://stackoverflow.com/questions/26939175/rxjava-and-cached-data and this blog http://blog.danlew.net/2015/06/22/loading-data-from-multiple-sources-with-rxjava/ . Also check `doOnNext` operator from RxJava to get some ideas. – Sergii Pechenizkyi Jul 12 '15 at 10:53
  • You can take a look on my question where I discovered several options to combine data from multiple sources. With community help we get the most robust version for this use case. http://stackoverflow.com/questions/40797144/andorid-rxjava-how-to-get-data-from-cache-and-and-the-same-time-update-it-in-th – Gleichmut Nov 26 '16 at 11:42

1 Answers1

7

You can make any observable as blocking with .toBlocking shortcut (See full info https://github.com/ReactiveX/RxJava/wiki/Blocking-Observable-Operators)

Data d = getReadFromNetworkObservable()
            .toBlocking()
            .first() //  or single() or singleOrDefault()

// manipulate with data here

Combining cache with network data is described here: http://blog.danlew.net/2015/06/22/loading-data-from-multiple-sources-with-rxjava/

And here: RxJava and Cached Data

mkobit
  • 43,979
  • 12
  • 156
  • 150
Sergii Pechenizkyi
  • 22,227
  • 7
  • 60
  • 71
  • What's the difference between those two? – Eugene Jul 12 '15 at 11:02
  • They are the same to illustrate a point: there is no way `AnotherSubscriber` or lines below it could be called earlier than readDataFromNetwork returns a data result. – Sergii Pechenizkyi Jul 12 '15 at 11:06
  • I see. I've just realized I added confusion with `just()` operator. It is actually just an `Observer`. I've updated a question. – Eugene Jul 12 '15 at 11:08
  • Ok. Use `toBlocking` then. Also check that article describing more "natural" way to accomplish the task. – Sergii Pechenizkyi Jul 12 '15 at 11:30
  • I'm afraid in this case I can't use my Subsribers. – Eugene Jul 12 '15 at 11:39
  • Subscribers are callbacks. Callbacks are designed for handling results of asynchronous calls. You want the opposite - synchronous call. If you must use subscriber callback try to play with `Schedulers.immediate` but make sure, that your Observable returns only single result or explicitly calls `onComplete` or you will block the thread forever. – Sergii Pechenizkyi Jul 12 '15 at 11:59