43

As I understand RxJava2 values.take(1) creates another Observable that contains only one element from the original Observable. Which MUST NOT throw an exception as it is filtered out by the effect of take(1) as it's happened second.

as in the following code snippet

    Observable<Integer> values = Observable.create(o -> {
        o.onNext(1);
        o.onError(new Exception("Oops"));
    });

    values.take(1)
            .subscribe(
                    System.out::println,
                    e -> System.out.println("Error: " + e.getMessage()),
                    () -> System.out.println("Completed")
            );

Output

1
Completed
io.reactivex.exceptions.UndeliverableException: java.lang.Exception: Oops
    at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:366)
    at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onError(ObservableCreate.java:83)
    at ch02.lambda$main$0(ch02.java:28)
    at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40)
    at io.reactivex.Observable.subscribe(Observable.java:10841)
    at io.reactivex.internal.operators.observable.ObservableTake.subscribeActual(ObservableTake.java:30)
    at io.reactivex.Observable.subscribe(Observable.java:10841)
    at io.reactivex.Observable.subscribe(Observable.java:10827)
    at io.reactivex.Observable.subscribe(Observable.java:10787)
    at ch02.main(ch02.java:32)
Caused by: java.lang.Exception: Oops
    ... 8 more
Exception in thread "main" io.reactivex.exceptions.UndeliverableException: java.lang.Exception: Oops
    at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:366)
    at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onError(ObservableCreate.java:83)
    at ch02.lambda$main$0(ch02.java:28)
    at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40)
    at io.reactivex.Observable.subscribe(Observable.java:10841)
    at io.reactivex.internal.operators.observable.ObservableTake.subscribeActual(ObservableTake.java:30)
    at io.reactivex.Observable.subscribe(Observable.java:10841)
    at io.reactivex.Observable.subscribe(Observable.java:10827)
    at io.reactivex.Observable.subscribe(Observable.java:10787)
    at ch02.main(ch02.java:32)
Caused by: java.lang.Exception: Oops
    ... 8 more

My questions :

  1. Am I understanding it correct ?
  2. What's really happening to cause the exception.
  3. How to solve this from the consumer ?
abd3lraouf
  • 1,438
  • 1
  • 18
  • 24

4 Answers4

63
  1. Yes, but because the observable 'ends' does not mean the code running inside create(...) is stopped. To be fully safe in this case you need to use o.isDisposed() to see if the observable has ended downstream.
  2. The exception is there because RxJava 2 has the policy of NEVER allowing an onError call to be lost. It is either delivered downstream or thrown as a global UndeliverableException if the observable has already terminated. It is up to the creator of the Observable to 'properly' handle the case where the observable has ended and an Exception occurs.
  3. The problem is the producer (Observable) and the consumer (Subscriber) disagreeing on when the stream ends. Since the producer is outliving the consumer in this case, the problem can only be fixed in the producer.
Kiskae
  • 24,655
  • 2
  • 77
  • 74
  • 6
    would `if (!o.isDisposed()) { o.onError(new Exception("Oops")); }` be a correct way to handle that? – P.J.Meisch Apr 20 '17 at 17:20
  • 2
    If it is acceptable for that exception to be lost in the case that the observable is no longer observed then yes. If the exception should really go somewhere then it should be called unconditionally. – Kiskae Apr 20 '17 at 17:22
  • @Kiskae Is there a way to handle this from consumer ? – abd3lraouf Apr 20 '17 at 17:33
  • 2
    No, this needs to be fixed in the producer since the consumer has declared itself as terminated. – Kiskae Apr 20 '17 at 18:02
  • Thanks @Kiskae please update your answer to sum up all comments. I'll mark the question as **answered** – abd3lraouf Apr 20 '17 at 18:07
  • I've added one more question to the list – abd3lraouf Apr 20 '17 at 18:07
  • 2
    if (!o.isDisposed()) { o.onError(new Exception("Oops")); } is not the correct way to handle this because of race conditions (o can be disposed between the if condition and the onError call and that's not theoretical, it happens in productive systems). See also this discussion: https://github.com/ReactiveX/RxJava/issues/4880 – Emanuel Moecklin Sep 07 '17 at 22:09
  • @EmanuelMoecklin At the time of posting, that method was not available in a public release. It is indeed the correct way to handle it post-RxJava 2.1.1 – Kiskae Sep 07 '17 at 23:07
  • @Kiskae tryOnError was indeed introduced after the question/answer. Nevertheless the if (!o.isDisposed... approach wasn't correct before that either (I had crashes because of that). There were workarounds like overwriting the standard RxJava error handler to deal with it. – Emanuel Moecklin Sep 08 '17 at 15:30
  • 2
    @AbdElraoufSabri I threw away that code once the tryOnError was introduced. Could find it in some old commit but what's the point now when you have tryOnError? – Emanuel Moecklin Sep 21 '17 at 00:23
  • 1
    @EmanuelMoecklin I'm still getting the `UndeliverableException` error even when I use `tryOnError`! `if (emitter.isDisposed.not()){emitter.tryOnError(getFailureError(data.error()))}` – Dr.jacky May 24 '19 at 10:48
  • @Kiskae `No, this needs to be fixed in the producer since the consumer has declared itself as terminated.` How can I fix it on the producer side? Please read the above comment. – Dr.jacky May 24 '19 at 10:52
  • If you're using `tryOnError` then the origin of that exception lies elsewhere. Without the full stacktrace it would be impossible to answer. – Kiskae May 24 '19 at 12:55
  • " To be fully safe in this case you need to use o.isDisposed() to see if the observable has ended downstream." - where is it documented that isDisposed() checked if downstream was ended? – Malachiasz Aug 13 '19 at 06:55
22

@Kiskae in previous comment correctly answered about the reason why such exception can occurs.

Here the link to official doc about this theme: RxJava2-wiki.

Sometimes you cannot change this behaviour so there is a way how to handle this UndeliverableException's. Here is code snippet of how to avoid crashes and misbehaviour:

RxJavaPlugins.setErrorHandler(e -> {
    if (e instanceof UndeliverableException) {
        e = e.getCause();
    }
    if ((e instanceof IOException) || (e instanceof SocketException)) {
        // fine, irrelevant network problem or API that throws on cancellation
        return;
    }
    if (e instanceof InterruptedException) {
        // fine, some blocking code was interrupted by a dispose call
        return;
    }
    if ((e instanceof NullPointerException) || (e instanceof IllegalArgumentException)) {
        // that's likely a bug in the application
        Thread.currentThread().getUncaughtExceptionHandler()
            .handleException(Thread.currentThread(), e);
        return;
    }
    if (e instanceof IllegalStateException) {
        // that's a bug in RxJava or in a custom operator
        Thread.currentThread().getUncaughtExceptionHandler()
            .handleException(Thread.currentThread(), e);
        return;
    }
    Log.warning("Undeliverable exception received, not sure what to do", e);
});

This code taken from the link above.

Important note. This approach sets global error handler to RxJava so if you can to get rid of these exceptions - it would be better option.

Ilia Kurtov
  • 999
  • 1
  • 12
  • 18
  • I can't to get rid of these exception and I need some data from this RX but when this error happen I can't get my data from Json ..what should I do now?:( – Anice Jahanjoo Oct 12 '19 at 12:59
  • When I make some network request inside of `Observable.create` my `subscriber` is not disposed at the beginning of the network call and is already disposed when I am getting response on the call. Is there a way to do not get `InterruptedException` in `RxJavaPlugins.setErrorHandler`? – isabsent Nov 24 '19 at 14:29
5

Kotlin

I call this in MainActivity onCreate method

private fun initRxErrorHandler(){
    RxJavaPlugins.setErrorHandler { throwable ->
        if (throwable is UndeliverableException) {
            throwable.cause?.let {
                Thread.currentThread().uncaughtExceptionHandler?.uncaughtException(Thread.currentThread(), it)
                return@setErrorHandler
            }
        }
        if (throwable is IOException || throwable is SocketException) {
            // fine, irrelevant network problem or API that throws on cancellation
            return@setErrorHandler
        }
        if (throwable is InterruptedException) {
            // fine, some blocking code was interrupted by a dispose call
            return@setErrorHandler
        }
        if (throwable is NullPointerException || throwable is IllegalArgumentException) {
            // that's likely a bug in the application
            Thread.currentThread().uncaughtExceptionHandler?.uncaughtException(Thread.currentThread(), throwable)
            return@setErrorHandler
        }
        if (throwable is IllegalStateException) {
            // that's a bug in RxJava or in a custom operator
            Thread.currentThread().uncaughtExceptionHandler?.uncaughtException(Thread.currentThread(), throwable)
            return@setErrorHandler
        }
        Log.w("Undeliverable exception", throwable)
    }
}
Hamid Zandi
  • 2,714
  • 24
  • 32
0

While using observable.create() just go with tryOnError(). onError() doesn't guaranty that error will get handled. There are various error handling operators are there HERE

Ayush Jain
  • 563
  • 7
  • 11