14

I am trying to implement an asynchronous task using RxJava in Android. I tried the following code and it didn't work. It executes on the UI thread. I am using the following version of RxAndroid 0.24.0.

try {
    Observable.just(someMethodWhichThrowsException())
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(s -> onMergeComplete());
}
catch (IOException e) {
    e.printStackTrace();
}

However, the following works asynchronously for me.

Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            try {
                someMethodWhichThrowsException();
            } catch (IOException e) {
                e.printStackTrace();
            }

            subscriber.onCompleted();
        }
    });
    observable.subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()).subscribe();

I am trying to understand the following:

  1. What is the difference between them?
  2. What is the best practice while creating async tasks?
Peter Mortensen
  • 30,738
  • 21
  • 105
  • 131
muneikh
  • 2,067
  • 5
  • 25
  • 59

2 Answers2

21
  1. What is the difference between them?
Observable.just(someMethodWhichThrowsException())
    .subscribeOn(Schedulers.newThread())

This is equivalent to the following:

Object someResult = someMethodWhichThrowsException();
Observable.just(someResult)
    .subscribeOn(Schedulers.newThread())

As you can see this makes the synchronous method call first, then passes it to Observable.just to become an Observable.

Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            ...
        }
    })
    .subscribeOn(Schedulers.newThread())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe();

This method, however, will run the code in the call block on subscription. You've told it you want to subscribe on a new thread (subscribeOn(Schedulers.newThread())), so the subscription happens on a new thread, and the code which gets run on subscription (the call block) gets run on that thread too. This is similar behaviour to calling Observable.defer.

  1. What is the best practice while creating async tasks?

Well, that's up to you and your desired behaviour. Sometimes you want the async code to begin running immediately (in which case you may want to cache it using one of the operators for that purpose). I'd definitely consider using the Async Utils library for this.

Other times, you'll want it to run only on subscription (which is the behaviour in the examples here) - for example if there are side-effects, or if you don't care when it's run and just want to use the built-ins to get something off the UI thread. Dan Lew mentions that Observable.defer is very handy for taking old code and getting it off the UI thread, during a conversion to Rx.

Adam S
  • 16,144
  • 6
  • 54
  • 81
  • No problem! The other answers gave you a solution, but didn't really give much of an explanation. I'd recommend going with the built-in `defer` over adding the extra library, especially as it's for Android - keep that APK size down :) – Adam S Apr 16 '15 at 13:04
  • Thanks, I didn't realized that. I think this should be the recommended approach. :) – muneikh Apr 16 '15 at 13:09
  • Async Utils library seems really old, and not updated for almost a year. – IgorGanapolsky Sep 29 '15 at 04:10
  • It's a small library and if it's stable there's no need to update it, though I understand the sentiment. They're currently [accepting pull requests](https://github.com/ReactiveX/RxJavaAsyncUtil/pull/6) so it's definitely not dead! Everything that the library does you can build yourself anyway, but it's especially useful to help out when you're starting out with Rx. – Adam S Sep 29 '15 at 12:56
11

Use Async.start() from RxJava Async Utils library. This will call the function you provide on another thread.

Example:

Observable<String> observable = Async.start(new Func0<String>() {
    @Override
    public String call() {
        try {
            return someMethodWhichThrowsException();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
});

As you note, checked exceptions must be wrapped into RuntimeExceptions.

See also https://github.com/ReactiveX/RxJava/wiki/Async-Operators#start

clemp6r
  • 3,665
  • 2
  • 26
  • 31
  • Thanks... this is really helpful. Would be great, if you have any pointers on the difference on my original code? – muneikh Apr 16 '15 at 11:57
  • 1
    I can answer. Observable.just() allows creating an observable from an existing result. If you write Observable.just(myFunction()), myFunction() is is executed and then the result is passed to the just method (as in any Java expression). So this is not asynchronous at all. On the other hand, when you call Async.start(), your computation method is not called yet. It will be called later in another thread, when Rx will call func.call() (where func is the parameter provided to Async.start()). – clemp6r Apr 16 '15 at 12:00
  • 3
    One note on `Async.start`, it will immediately start the work, even before subscribe, which sometimes is exactly what you want. If you want to wait until subscribe, use `Async.toAction(...).call()` or `Observable.defer`. In this case, `defer` would handle the exception cleaner since you can `return Observable.error(new MyException());` instead of throwing it in there, although a thrown exception would produce the same result. – lopar Apr 16 '15 at 15:27
  • @lopar, RxJavaAsyncUtil provides [override start signature](https://github.com/ReactiveX/RxJavaAsyncUtil/blob/0.x/src/main/java/rx/util/async/Async.java#L96) that gets also a `Scheduler` which is "Scheduler to run the function on". can I trust that detail? – Eido95 Nov 21 '16 at 06:56