1

I have a code like this:

Process X:

 getLocationObservable() // ---> async operation that fetches the location. 
    //  Once location is found(or failed to find) it sends it to this filter :
            .filter(location -> {
                --- Operation A ---
                after finishing the operation A, I either return 'true' and continue 
                 to the next observable which is a Retrofit server call, or simply 
                  return 'false' and quit.
            })
            .flatMap(location -> getRetrofitServerCallObservable( location )
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread()))
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(
                    new Observer<MyCustomResponse>() {
                        @Override
                        public void onSubscribe(Disposable d) {
                            _disposable = d;
                        }
                        @Override
                        public void onNext(MyCustomResponse response) {

                        }
                        @Override
                        public void onError(Throwable e) {

                        }
                        @Override
                        public void onComplete() {

                        }
                    });

In the Location class it's like this :

 private PublishSubject<Location> locationPublishSubject = PublishSubject.create();

    public Observable<Location> getLocationObservable() {
        return locationPublishSubject;
    }

    and then...
    locationPublishSubject.onNext( foundLocation );

Problems with PublishSubject:

  1. If I call locationPublishSubject.onError( new <some mock exception> ) --> it crashes with io.reactivex.exceptions.UndeliverableException
  2. If I call locationPublishSubject.onComplete after onNext --> the onNext doesn't happen. It jumps straight to onComplete.
BVtp
  • 2,308
  • 2
  • 29
  • 68
  • 1.) Please kindly provide the full exception. 2.) `PublishSubject` does not retain any `onNext` item. If you don't have an observer the moment you call `onNext`, that item is lost. See if `PublishSubject.hasObservers` returns false before you call `onNext` or not. – akarnokd Jun 24 '18 at 08:27
  • 1) I have no specific exception.. in some cases it is a regular `Exception` object. I just want to be able to throw even an empty exception rather than passing through `onNext` a `null` wrapped in some custom class. 2) `hasObservers` returns `true` before I call `onNext`. And yet, even if I call `onComplete` right after `onNext` - `onNext` callback is never called. It jumps straight to `onComplete`. Which means basically that I have to call only `onNext` and then manually call `dispose()` on the disposable. Please correct me if I'm wrong though.. – BVtp Jun 24 '18 at 08:39
  • 1) do you want to continue after an error? Consider wrapping. 2) You have a filter in your flow, that may filter out your item. – akarnokd Jun 24 '18 at 09:39
  • 1) Ideally, I'd prefer to catch the error in the `onError` callback and then execute some code I built for this case. Instead of analyzing if it was succesful or not during `filter`. Cause with onError I don't have to manually call `dispose()`. 2) because of the filter I cannot use `onComplete`? is this why it skips `onNext` callback if I call onComplete right after onNext? so after every call of the disposable I should call `dispose()` manually? – BVtp Jun 24 '18 at 09:45
  • The code you provided and the behavior you are describing do not match. Please provide a standalone example that reproduces your problem. – akarnokd Jun 24 '18 at 10:13
  • I'm sorry, but it's excatly the code I'm using(I only removed the code inside "filter" and the other callbacks because they're irrelevant, the problems occur before them) – BVtp Jun 24 '18 at 10:25
  • Please place `doOnNext`s at various places to see where `onNext`s disappear. Does the code run multiple times? If you `onComplete` a subject, the rest of the code won't execute next time and you get `onComplete` through the chain. – akarnokd Jun 24 '18 at 11:58

0 Answers0