5

I have multiple network calls in my app. I like to run a network request in the IO thread by using the compose operator with this transformer:

public static <T> Transformer<T, T> runOnIoThread()
{
    return tObservable -> tObservable.subscribeOn( Schedulers.io() )
        .observeOn( AndroidSchedulers.mainThread() );
}

This seems to work well as long as I only have single network calls. However if I chain them as in the following example I'm getting Android's NetworkInMainThreadException.

public Observable<String> networkCall1()
{
    return <NETWORK_CALL_1()>
            .compose( runOnIoThread() );
}

public Observable<String> networkCall2( String input )
{
    return <NETWORK_CALL_2(input)>
            .compose( runOnIoThread() );
}

public Observable<String> chainedCalls()
{
    return networkCall1()
            .flatMap( result1 -> networkCall2( result1 ) );
}

My idea before was that compose is applied to the complete observable chain before the call, and that later compose calls would "overwrite" the behavior of a previous one. But in fact, it looks like the observeOn call of the first compose (observeOn main thread) dominates the second compose call (subscribeOn the IO thread). One obvious solution would be to have two versions of networkCall1- one which applies schedulers and another which doesn't. However, this would make my code quite verbose.

Do you know better solutions? Can you explain the behavior of applying schedulers twice (with compose) in an observable chain?

Edit: I'm using retrofit with RxJava for my network calls.

Peter F
  • 3,633
  • 3
  • 33
  • 45

1 Answers1

8

You can only use subscribeOn() once per stream. If you use it a second time it won't do anything. As such when you're chaining your two methods together you run:

observeOn(AndroidSchedulers.mainThread())

which switches the operation over to the main thread. After that it then stays there because the next subscribeOn() is effectively being ignored.

I would suggest that you are actually overcomplicating things with your compose method. Just add

subscribeOn(Schedulers.io())

To both your network calls and then use

observeOn(AndroidSchedulers.mainThread())

Just before you want to process results on the main thread. You'd end up with something like:

public Observable<String> networkCall1()
{
    return <NETWORK_CALL_1()>
            .subscribeOn(Schedulers.io);
}

public Observable<String> networkCall2( String input )
{
    return <NETWORK_CALL_2(input)>
            .subscribeOn(Schedulers.io);
}

public Observable<String> chainedCalls()
{
    return networkCall1()
            .flatMap( result1 -> networkCall2( result1 ) )
            .observeOn(AndroidSchedulers.mainThread());
}

EDIT

If you really want to have an observeOn() call on the individual network call methods you can. You would have to add an extra observeOn() to your chainedCalls() method. You can have as many observeOn() calls as you like per stream. It would be something like:

public Observable<String> networkCall1()
{
    return <NETWORK_CALL_1()>
            .subscribeOn(Schedulers.io)
            .observeOn(AndroidSchedulers.mainThread());
}

public Observable<String> networkCall2( String input )
{
    return <NETWORK_CALL_2(input)>
            .subscribeOn(Schedulers.io)
            .observeOn(AndroidSchedulers.mainThread());
}

public Observable<String> chainedCalls()
{
    return networkCall1()
            .observeOn(Schedulers.io)
            .flatMap( result1 -> networkCall2( result1 ) )
            .observeOn(AndroidSchedulers.mainThread());
}
Jahnold
  • 7,623
  • 2
  • 37
  • 31
  • Hi Jahnold ! Thank you! Your explanation helped me better understand subscribeOn, observeOn, and compose. Since my methods will be part of library, I'd like to make the calls as easy as possible (i.e. sparing the observeOn call). JackWharton wrote [here](http://stackoverflow.com/a/21010181/2011622) about Retrofit with RxJava that it executes calls in an IO/background thread and observeOn is on the caller's thread. Chaining two retrofit calls is still possible though. So there must be a way to do that, right? – Peter F Apr 26 '16 at 16:59
  • I've updated the answer to show how you can use an extra `observeOn` if you really want them to all be part of the calls. – Jahnold Apr 26 '16 at 21:32
  • 1
    Thanks for ellaborating! So while `subscribeOn`can only be used once in a stream `observeOn`can be used multiple times to switch to different threads. I couldn't find that information in the documentation. How did you know that? Do you have a reference? – Peter F Apr 27 '16 at 08:30
  • 1
    In the official documentation, I found a good [example + explanation](http://reactivex.io/documentation/operators/subscribeon.html) showing a stream with multiple `observeOn` calls and one `subscribeOn` call. – Peter F Apr 27 '16 at 08:53