13

I'm using Retrofit and RxJava to perform some background tasks. Code looks like this:

public class MyLoader{  
  public Observable<MyData> getMyData(){
      return setupHelper().flatMap(new Func1<MyHelper, Observable<MyData>>() {
              @Override
              public Observable<MyData> call(MyHelper myHelper) {
                  return queryData(myHelper);
              }
      });
  }

  private Observable<MyData> queryData(MyHelper myHelper){
      ...
  }

  private Observable<MyHelper> setupHelper(){
     return Observable.create(new Observable.OnSubscribe<MyHelper>() {
          @Override
          public void call(final Subscriber<? super MyHelper> subscriber) {
              try{
                MyHelper helper = makeRetrofitCall();//Using Retrofit blocking call to get some data
                subscriber.onNext(helper);
                subscriber.onCompleted();
              }catch(RetrofitError e){
                subscriber.onError(e)
              }
          }
     }
  }
}

This fails with RetrofitError, due to NetworkOnMainThread Exception at this line:

  MyHelper helper = makeRetrofitCall();//Using Retrofit blocking call to get some data

Subscribing to my Observable:

myLoader.getMyData()
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<MyData>() {
                    @Override
                    public void onCompleted() {

                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onNext(MyData inventory) {

                    }
                });

According to Rx documentation flatMap doesn't operate on any background thread. My question is how do I ensure that the whole getMyData() method runs in background.

  • 2
    At first glance, I don't see anything wrong. Could you try and put `.subscribeOn(Schedulers.io())` into `setupHelper` instead? – akarnokd Sep 03 '15 at 09:53
  • Retrofit already integrates with RxJava and is able to return Observables directly, so you don't need to create your own Observables performing the requests. – BladeCoder Sep 07 '15 at 22:07
  • Did you find another approach than adding the `.subscribeOn()` in the second observable? – Juan Saravia Dec 09 '15 at 02:52
  • This seems to be the right answer: http://stackoverflow.com/a/35429084/2908525 – Juan Bustamante May 08 '17 at 20:12
  • Retrofit is capable of returning `Observable` directly, you shouldn't need to wrap the call like that. Otherwise, you can also simply use `Observable.fromCallable(this::makeRetrofitCall)` (assuming you have functions refs, either via Java 8 or via retrolambda). – njzk2 May 08 '17 at 20:24

3 Answers3

4


I just add observeOn(Schedulers.newThread()) before flatMap and it works!

initialjie
  • 51
  • 4
  • Welcome to the site! [Please use Answers exclusively to answer the question](//meta.stackoverflow.com/q/92107). If you have a different question, please ask it by clicking the ["Ask Question" button at the top of the page](//stackoverflow.com/questions/ask). If you have the exact same issue as this one, when you have enough reputation, you can [upvote the question](//stackoverflow.com/privileges/vote-up), or [add a bounty on it](//stackoverflow.com/help/bounty). You can also "star" the question as a favorite, in which case the system will notify you of any new answers. – Mogsdad Mar 31 '16 at 02:02
  • 4
    Why my answer not accepted? I can't see anything wrong. Could anybody tell me why? – initialjie Apr 18 '16 at 10:49
  • I think it's because by calling observeOn before the flatMap, you are switching everything to background threads, not just the one step on the pipeline. – Juan Bustamante May 08 '17 at 20:08
  • @Juan Bustamante subscribeOn only effective at the last called time. – initialjie Mar 03 '18 at 09:49
2

This moves just one step in the pipeline to the background thread:

Observable<Integer> vals = Observable.range(1,10);

vals.flatMap(val -> Observable.just(val)
            .subscribeOn(Schedulers.computation())
            .map(i -> intenseCalculation(i))
).subscribe(val -> System.out.println(val));

Originally answered here: https://stackoverflow.com/a/35429084/2908525

Community
  • 1
  • 1
Juan Bustamante
  • 386
  • 3
  • 12
0

There is a good chance when you create the MyLoader object in the main thread the Observable.create be executed as well (or maybe somewhere else before in your code (?) ). If it's so, the .subscribeOn(Schedulers.io()) will have no effect on changing the thread.

You can try wrap the .create() with a .defer() to make sure the Observable is created only when it's subscribed.

e.g. defer(() -> create(....))

Diolor
  • 13,181
  • 30
  • 111
  • 179