0

I've been trying to convert my onErrors into notifications in order to keep the stream emitting items. As far as I understood the materialize() operator does just that. So basically:

materialize() / dematerialize() are available to turn terminal events into Notification

So I made a test for this based on this question (How to continue streaming items after error in RxJava?). I tried the following:

 @Test
public void materializeTest() {
    final Observable<String> stringObservable = Observable.fromArray("1", "2", "3")
            .flatMap(x -> {
                if (x.equals("2")) {
                    return Observable.error(new NullPointerException());
                }

                return Observable.just(x);
            })
            .materialize()
            .map(n -> n.getValue());

    final TestObserver<String> testObs = stringObservable.test();
    Java6Assertions.assertThat(testObs.values().size()).isEqualTo(2);

    testObs.assertValueAt(0, "1");
    testObs.assertValueAt(1, "3");
}

The result is that no more items are emitted after "2" gives the error. I've also tried to warp on my own Notification object (MyNotification<T>) and do something like:

stringObs
  .map(string -> MyNotification.success(string)
  .onErrorReturn(error -> MyNotification.error())

But the end result is always the same: after "2" no more items are emitted. I'm 100% doing something wrong but can't really understand what is.

Peddro
  • 1,105
  • 2
  • 9
  • 19

1 Answers1

1

With flatMap, if one of the inner Observables fails, the sequence is terminated an no further items are transformed from the upstream. That happens before materialize() even gets involved.

So instead of trying to materialize the merged flow, materialize the inner sources individually:

Observable.fromArray("1", "2", "3")
        .flatMap(x -> {
            if (x.equals("2")) {
                return Observable.<String>error(new NullPointerException())
                                 .materialize();
            }

            return Observable.just(x)
                             .materialize();
        })
        .filter(n -> n.isOnNext())
        .map(n -> n.getValue());
akarnokd
  • 69,132
  • 14
  • 157
  • 192
  • I've tried this but it stills fails the test. I was able to pass the test by making a map and onErrorReturn operations to "MyNotification" happening inside the flatmap (basically, what you suggested but without materialize()) I can post that working solution. – Peddro Jan 24 '18 at 15:33
  • Ah nice! :D Thank you so much. I understand the issue :) – Peddro Jan 24 '18 at 17:22