I've been trying to convert my onErrors into notifications in order to keep the stream emitting items. As far as I understood the materialize() operator does just that. So basically:
materialize() / dematerialize() are available to turn terminal events into Notification
So I made a test for this based on this question (How to continue streaming items after error in RxJava?). I tried the following:
@Test
public void materializeTest() {
final Observable<String> stringObservable = Observable.fromArray("1", "2", "3")
.flatMap(x -> {
if (x.equals("2")) {
return Observable.error(new NullPointerException());
}
return Observable.just(x);
})
.materialize()
.map(n -> n.getValue());
final TestObserver<String> testObs = stringObservable.test();
Java6Assertions.assertThat(testObs.values().size()).isEqualTo(2);
testObs.assertValueAt(0, "1");
testObs.assertValueAt(1, "3");
}
The result is that no more items are emitted after "2" gives the error. I've also tried to warp on my own Notification object (MyNotification<T>
) and do something like:
stringObs
.map(string -> MyNotification.success(string)
.onErrorReturn(error -> MyNotification.error())
But the end result is always the same: after "2" no more items are emitted. I'm 100% doing something wrong but can't really understand what is.