1

This example method returns Observable<Integer> of numbers 1 to 9:

public class Provider {
    public static Observable<Integer> test() {
        return Observable.create(SyncOnSubscribe.createStateless(new Action1<Observer<? super Integer>>() {
            @Override
            public void call(Observer<? super Integer> observer) {
                for (int i = 1; i < 9; i++)
                    // when i == 2, throws IllegalStateException:
                    // "onNext called multiple times!":
                    observer.onNext(i);
            }
        }));
    }
}

This one filters only the 1 to 9 numbers that are multiples of 3:

public class RxJavaUnitTest {
    @Test
    public void rxJavaTest(){
        List<Integer> multiplesOf3 = Provider.test().filter(new Func1<Integer, Boolean>() {
            @Override
            public Boolean call(Integer i) {
                return i % 3 == 0;
            }
        }).toList().toBlocking().single();
    }
}

But it throws IllegalStateException: “onNext called multiple times!”. How can I provide more values to the observer if I can't call onNext more than once?

Maybe SyncOnSubscribe.createStateless is the wrong method here and to be replaced with something else?

Tar
  • 8,529
  • 9
  • 56
  • 127

1 Answers1

4

Why not using Observable.from, Observable.fromCallable or Observable.defer or similar variations instead of Observable.create ?


However for your answer :

As documented in the SyncOnSubscribe.createStateless Javadoc :

... This overload creates a "state-less" SyncOnSubscribe which does not have an explicit state value. This should be used when the next function closes over it's state.

This Javadoc is not quite clear on that matter, more can be found the SyncOnSubscribe.next Javadoc :

... To emit data to a downstream subscriber call observer.onNext(t). To signal an error condition call observer.onError(throwable) or throw an Exception. To signal the end of a data stream call observer.onCompleted(). Implementations of this method must follow the following rules.

  • Must not call observer.onNext(t) more than 1 time per invocation.
  • Must not call observer.onNext(t) concurrently.

The value returned from an invocation of this method will be passed in as the state argument of the next invocation of this method.

That means that in the callback action

  • onNext should be immediately followed by either onCompleted or onError
  • and there should be only one onNext invocation per callback invocation

This is necessary to enforce safety in concurrent environment (this why it is called SyncOnSubscribe) and for back-pressure.

It is possible to factor in the for loop within the callback :

return Observable.create(SyncOnSubscribe.createStateless(
        new Action1<Observer<? super Integer>>() {
            int counter = 0;

            @Override
            public void call(Observer<? super Integer> observer) {
                if (counter < 9) {
                    observer.onNext(counter++);
                } else {
                    observer.onCompleted();
                }
            }

        }));

Notice the onCompleted invocation, otherwise, your monad will run forever.

This will produce the following list {0, 3, 6}. But the code is ugly and violates the SyncOnSubscribe.createStateless contract. SyncOnSubscribe.createStateless would be useful for stateless productions, like random. Instead one should use the SyncOnSubscribe.createStateful :

return Observable.create(SyncOnSubscribe.createStateful(
        () -> 0,
        (counter, observer) -> {
            if (counter < 9) {
                observer.onNext(counter);
            } else {
                observer.onCompleted();
            }
            return counter + 1;
        }
));

However the for loop still needs to be factored out and it is sill necessary to invoke onCompleted.

Community
  • 1
  • 1
bric3
  • 40,072
  • 9
  • 91
  • 111
  • Ok, got it. However I oversimplified my question... I need a solution to nested `for-each` loops, like: `for (NetworkInterface intf : interfaces) List addrs = Collections.list(intf.getInetAddresses()); for (InetAddress addr : addrs) { ... observer.onNext(addr); ... }` What I'm essentially trying to do here, is convert the `getIPAddress` method from [this answer](http://stackoverflow.com/questions/6064510/how-to-get-ip-address-of-the-device/13007325#13007325) to `RxJava`: I need an initializer part. So how can I handle this case? – Tar Jul 10 '16 at 10:50
  • @Tar Why not using [`flatMapIterable`](http://reactivex.io/RxJava/javadoc/rx/Observable.html#flatMapIterable(rx.functions.Func1)) ? Something like (pseudo code) `Observable.from(interfaces).flatMapIterable(intf::getInetAddresses())` – bric3 Jul 10 '16 at 20:49