1

I have started learning RxAndroid and below is the code I wrote to iterate over a model object (Results) that contains data fetched from the server. I'm iterating over the model object in the observable and providing a newly created object in the observer. I'm trying to take subscription of the observer to unsubscribe the task upon Orientation changes of the fragment. However the subscribe() returns VOID instead of subscription object.

Questions:

  • Does the latest version of RxAndroid handle unsubscription itself upon configuration/orientation change?
  • In case configuration change happens before the task is complete, the only way to restart this task that I can think of is, I persist the server response in onSavedInstance() and retrieve it from bundle when the fragment is recreated. It'll require booleans to figure out if the configuration change happened before the configuration change or not. Is there a graceful and cleaner way of coping with this?

    private void createComicList(final List<Result> marvelResults) {
        final MarvelComics marvelComics = new MarvelComics();
    
        Observable marvelObservable2 = Observable.create(new ObservableOnSubscribe<MarvelComic>() {
            @Override
            public void subscribe(ObservableEmitter<MarvelComic> e) throws Exception {
                for(Result result : marvelResults) {
                    MarvelComic marvelComic = new MarvelComic();
                    marvelComic.setDescription(result.getDescription());
                    marvelComic.setTitle(result.getTitle());
                    marvelComic.setPageCount(result.getPageCount());
                    marvelComic.setThumbnailUrl(result.getThumbnail().getPath());
                    marvelComic.setId(result.getId());
    
                    e.onNext(marvelComic);
                }
                e.onComplete();
            }
        });
    
        marvelObservable2.subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<MarvelComic>() {
    
                    @Override
                    public void onSubscribe(Disposable d) {
                    }
    
                    @Override
                    public void onNext(MarvelComic comic) {
                        marvelComics.getMarvelComicList().add(comic);
                    }
    
                    @Override
                    public void onError(Throwable e) {
    
                    }
    
                    @Override
                    public void onComplete() {
                        showToast();
                    }
                });
    }
    
Umer Farooq
  • 7,356
  • 7
  • 42
  • 67

2 Answers2

0

The Observable.subscribe(Observer<? super T>) method returns void in the 2.x since the Observer.onSubscribe(Disposable) is there to get the cancellation support that used to be Subscription in 1.x.

final CompositeDisposable composite = new CompositeDisposable();

Observable<Integer> source = Observable.just(1)

source.subscribe(new Observer<Integer>() {
    @Override public void onSubscribe(Disposable d) {
        composite.add(d);   // <---------------------------------------------
    }
    @Override public void onNext(Integer t) {
        System.out.println(t);     
    }

    @Override public void onError(Throwable e) {
        e.printStackTrace();
    }
    @Override public void onComplete() {
        System.out.println("Done");
    }
});

composite.add(source
         .subscribeWith(  // <-----------------------------------------------
new DisposableObserver<Integer>() {
    @Override public void onNext(Integer t) {
        System.out.println(t);     
    }

    @Override public void onError(Throwable e) {
        e.printStackTrace();
    }
    @Override public void onComplete() {
        System.out.println("Done");
    }
});
akarnokd
  • 69,132
  • 14
  • 157
  • 192
  • Thanks for a comprehensive example. Can you please tell me the reason creating a new DisposableObserver()? Can't we just call compositeDisposable.dispose() to dispose the added disposable? – Umer Farooq Apr 11 '17 at 11:12
  • `DisposableObserver` let's you dispose `Observer`s individually, not just en-masse. For example, you have a periodic flow and a one time flow which latter the user can cancel with a button click; you wouldn't dispose the periodic one for this case but you'd want to dispose both when the Activity is destroyed. – akarnokd Apr 11 '17 at 12:09
  • Won't compositeDisposable.dispose() dispose all of the disposables? I am executing this statement in onStop() of the fragment. Another thing that is confusing is, what if the fragment configuration changes before the RxAndroid task is completed. To keep track of whether the result was produced by the RxAndroid task was completed or not, I will have to create boolean flags which isn't an elegant solution. How do you recommend one will cope with this situation? – Umer Farooq Apr 11 '17 at 12:45
  • Use `clear()` instead of `dispose()` if you want to reuse the same composite. – akarnokd Apr 11 '17 at 15:10
-2

subscribe() method of Observable returns Subscription object in earlier versions of RxJava and current version returns an object of Disposble class which you can unsubscribe by invoking dispose() method.

For your second question you may check this answer Best practice: AsyncTask during orientation change

Community
  • 1
  • 1
Aldrin Joe Mathew
  • 472
  • 1
  • 4
  • 13