This example method returns Observable<Integer>
of numbers 1 to 9:
public class Provider {
public static Observable<Integer> test() {
return Observable.create(SyncOnSubscribe.createStateless(new Action1<Observer<? super Integer>>() {
@Override
public void call(Observer<? super Integer> observer) {
for (int i = 1; i < 9; i++)
// when i == 2, throws IllegalStateException:
// "onNext called multiple times!":
observer.onNext(i);
}
}));
}
}
This one filters only the 1 to 9 numbers that are multiples of 3:
public class RxJavaUnitTest {
@Test
public void rxJavaTest(){
List<Integer> multiplesOf3 = Provider.test().filter(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer i) {
return i % 3 == 0;
}
}).toList().toBlocking().single();
}
}
But it throws IllegalStateException: “onNext called multiple times!”
. How can I provide more values to the observer
if I can't call onNext
more than once?
Maybe SyncOnSubscribe.createStateless
is the wrong method here and to be replaced with something else?