16

So I'm playing around with RX (really cool), and I've been converting my api that accesses a sqlite database in Android to return observables.

So naturally one of the problems I started to try to solve is, "What if I want to make 3 API calls, get the results, and then do some processing once they are all finished?"

It took me an hour or 2, but I eventually found the Zip Functionality and it helps me out handily:

    Observable<Integer> one = getNumberedObservable(1);
    Observable<Integer> two = getNumberedObservable(2);
    Observable<Integer> three = getNumberedObservable(3);

    Observable.zip(one, two, three, new Func3<Integer, Integer, Integer, Integer>() {
        @Override
        public Integer call(Integer arg0, Integer arg1, Integer arg2) {
            System.out.println("Zip0: " + arg0);
            System.out.println("Zip1: " + arg1);
            System.out.println("Zip2: " + arg2);
            return arg0 + arg1 + arg2;
        }
    }).subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer arg0) {
            System.out.println("Zipped Result: " + arg0);
        }
    });

public static Observable<Integer> getNumberedObservable(final int value) {
    return Observable.create(new OnSubscribeFunc<Integer>() {
        @Override
        public Subscription onSubscribe(Observer<? super Integer> observer) {
            observer.onNext(value);
            observer.onCompleted();
            return Subscriptions.empty();
        }
    });
}

Great! So that's cool.

So when I zip up the 3 observables they run in serial. What if I want them all to run in parallel at the same time so I end up getting the results faster? I've played around with a few things, and even tried reading some of the original RX stuff people have written in C#. I'm sure there is a simple answer. Can anyone point me in the right direction? What is the proper way to do this?

spierce7
  • 14,797
  • 13
  • 65
  • 106
  • If I may ask, why are you waiting to do processing until all three are finished? – Scott Baker Jan 20 '14 at 18:59
  • @ScottSEA sure, so set's say I've got a screen that requires 3 different elements from SQLite to properly draw, or 3 different pieces of network information. I'd want to make sure I had everything before I go ahead and draw the screen. – spierce7 Jan 20 '14 at 19:10
  • So you're not waiting until the sequences are *complete*, you're waiting until you have a value from each sequence? – Scott Baker Jan 20 '14 at 19:19
  • @ScottSEA the way I've setup my api is that everything only returns a single object and then completes. If you feel there are better ways to set this up, I'm all ears. I'm very new to RX and would love to hear anything you have to say. – spierce7 Jan 20 '14 at 19:25
  • @ScottSEA even if I'm returning multiple items I'd just send a single list of items instead of multiple onNext calls. – spierce7 Jan 20 '14 at 19:35

3 Answers3

20

zip does run the observables in parallel - but it also subscribes to them serially. Since your getNumberedObservable is completing in the subscription method it gives the impression of running serially, but there is in fact no such limitation.

You can either try with some long running Observables that outlive their subscription logic, such as timer, or use the subscribeOn method to subscribe asynchronously to each stream passed to zip.

James World
  • 29,019
  • 9
  • 86
  • 120
  • 1
    Ah! I can't believe I forgot to use subscribeOn. Thanks for pointing that out. I tested it and it works. So now I'm left with the question of what if I wanted them to all run serially, would I just subscribe to them all on a single thread, or is there a better way of combining all the observables together to run them serially? Thanks! – spierce7 Jan 18 '14 at 21:05
  • If you want to run them serially, you've probably come to the wrong API! :) Joking aside, you can do it, but it's fiddly. If the streams are all of the same type, you can chain them using `concat`, if they are all different then you could create a type to hold the n results and then use `concat` with `select` to project each stream's result to its place-holder in the type and use `scan` to accumulate a single result. You could also just subscribe to each successive stream in the onCompleted of the preceding. – James World Jan 19 '14 at 14:51
  • You can also use `selectMany` to project the result of a preceding stream into a query for the next. This works well if the result of one stream is passed into the next. – James World Jan 19 '14 at 14:55
  • Also, subscribing to them all on a single thread wouldn't be guaranteed to work - it's very common for subscribe method to return before an operator is finished. – James World Jan 19 '14 at 14:58
5

In RxJava, use toAsync to turn a regular function into something that will run on a thread and return its result in an observable.

I don't know Java syntax that well, but it would look something like:

public static Integer getNumber(final int value) { return value; }
public static Observable<Integer> getNumberedObservable(final int value) {
    return rx.util.functions.toAsync(new Func<Integer,Integer>() {
        @Override
        public Integer call(Integer value) { return getNumber(value); }
    });
};

That would work if getNumber were really accessing a database. When you call getNumberedObservable it returns an observable that will run getNumber on a separate thread when you subscribe to it.

Brandon
  • 38,310
  • 8
  • 82
  • 87
4

I was trying to do the same, running multiple threads in parallel using the zip. I ended opening a new so question and got an answer. Basically, you have to subscribe each observable to a new thread, so if you want to run three observables in parallel using the zip, you have to have subscribe to 3 separate threads.

Community
  • 1
  • 1
s-hunter
  • 24,172
  • 16
  • 88
  • 130