10

If I have an Observalbe :

List<Integer> ints = Lists.newArrayList(1, 2, 0, 3, 4);
Observable<Integer> o1 = Observable.from(ints);

I want to generate another observable , which divide by 12 :

Observable<Integer> o2 = o1.map(i -> 12/i);
o2.subscribe(
  v -> logger.info ("Subscriber value {}", v) ,
  t -> logger.error("Subscriber onError {} : {}", t.getClass() , t.getMessage())
);

It's obvious it will got error , and stopped when it encounter '0' :

RxTest - Subscriber value 12
RxTest - Subscriber value 6
RxTest - Subscriber onError class java.lang.ArithmeticException : / by zero

But what if I want the Observer(o2) skip the exception ?

I look into RxJava's doc about error handling , there is no way to skip the error. The onErrorResumeNext() and onExceptionResumeNext() needs a backup/fallback Observable , which is not what I want. The onErrorReturn need to specify the return value .

All three of the error handling methods cannot resume the original observer . for example :

Observable<Integer> o2 = o1.map(i -> 12/i)
      .onErrorReturn(t -> 0);

It prints :

RxTest - Subscriber value 12
RxTest - Subscriber value 6
RxTest - Subscriber value 0

Not printing the rest 12/3 and 12/4

The only solution seems relay in the map function :

Observable<Integer> o2 = o1.map(i -> {
  try {
    return Optional.of(12/i);
  } catch (ArithmeticException e) {
    return Optional.empty();
  }
}).filter(Optional::isPresent)
  .map(o -> (Integer) o.get());

It works , but it is cumbersome . I wonder if there's any way to easily skip any RuntimeException when manipulating Observable (such as map)

The above is about skipping exception in Observable . The following is about skipping exception in the Subscriber :

The situation is the same :

List<Integer> ints = Lists.newArrayList(1, 2, 0 , 3 , 4);
Observable<Integer> o1 = Observable.from(ints);
o1.subscribe(
  i -> logger.info("12 / {} = {}", i, 12 / i),
  t -> logger.error("{} : {}", t.getClass() , t.getMessage()),
  () -> logger.info("onCompleted")
);

It prints out :

RxTest - 12 / 1 = 12
RxTest - 12 / 2 = 6
RxTest - class java.lang.ArithmeticException : / by zero

When exception occurs in onNext , it triggers onError , and NOT RESPONDING to any data from the Observable . If I want the subscriber to swallow the exception , I have to try-catch the ArithmeticException in the onNext() . Is there any cleaner solution ?

It seems when a Subscriber faces an error in the onNext() that cannot be handled within (onNext) , it shall stop , right ? Is it a good design ?

smallufo
  • 11,516
  • 20
  • 73
  • 111
  • 1
    The way I look at it, `onNext()` is equivalent to `Iterator.next()` - and when there's a problem iterating a collection - an exception is thrown and the iterator bails out (it doesn't "resume" iterating) - see [ConcurrentModificationException](http://docs.oracle.com/javase/7/docs/api/java/util/ConcurrentModificationException.html) for instance. That's the same kind of behavior we have here. That said, @benjchristensen might be able to shed more light on the subject – Nir Alfasi Feb 08 '15 at 05:27
  • The `Optional` mapping is probably my favorite way of handling this. It's pretty clear, and the filter eats events nicely. It's a pretty good way to encapsulate it. I'd have used a `Try` type in scala, and then filtered on that. – BeepDog Feb 21 '18 at 21:49

3 Answers3

5
Observable.just(1, 2, 3, 0, 4)
            .flatMap(i -> Observable.defer(() -> Observable.just(12 / i))
                    .onErrorResumeNext(Observable.just(0)));

it's one way to go, though keep in mind that RxJava assumes that error is something truly unexcpeted (you can expect values to be 0). On the other hand, if you want to ignore divide by 0 exception maybe you should filter/map your values before division.

krp
  • 2,247
  • 17
  • 14
  • The only thing I would change here is `Observable.from(1, 2, 3, 0, 4)` to `Observable.just(1, 2, 3, 0, 4)`, because in 1.0.0 it wouldn't compile with `from`. – meddle Feb 09 '15 at 08:50
1

Try this way.

Observable<Integer> o2 = o1.map(i -> 12/i).onErrorFlatMap(e -> Observable.empty());

In the above code, error events are replaced by an empty stream and the original stream is resuming. Consequently errors are skipped.

findall
  • 2,176
  • 2
  • 17
  • 21
  • Oh, sorry I hadn't noticed the version. But it's possible to implement a `onErrorFlatMap`-like combinator, using `materialize` and `dematerialize`, although a little difficult. – findall Feb 08 '15 at 06:37
  • I am curious too , why such method disappear in 1.0 . – smallufo Feb 08 '15 at 06:39
0
import rx.lang.scala._

import scala.concurrent.duration.DurationDouble
import scala.util.{Failure, Success, Try}

val o1 = List(1, 2, 3, 0, 4) toObservable
val o2 = List(1, 2, 3, 4, 5) toObservable

val o3 = o1 zip o2 map {case (i1, i2) => i2 / i1 } // this will trigger a division by 0 error

val o4 = o3 lift {  
      subscriber: Subscriber[Try[Int]] => new Subscriber[Int](subscriber) {
        override def onNext(v: Int) {
          if (!subscriber.isUnsubscribed)
            subscriber.onNext(Success(v))
        }

        override def onError(e: Throwable) {
          if (!subscriber.isUnsubscribed)
            subscriber.onNext(Failure(e))
        }

        override def onCompleted() {
          if (!subscriber.isUnsubscribed)
            subscriber.onCompleted()
        }
      }
    }

o4 subscribe(println(_))
boggy
  • 3,674
  • 3
  • 33
  • 56