4

I'm looking to combine streams (observables) that start and end asynchronously:

-1----1----1----1---|->
     -2----2--|->
[ optional_zip(sum) ]
-1----3----3----1---|->

What I need it for: Adding audio streams together. They're streams of audio "chunks", but I'm going to represent them with integers here. So there's the first clip playing:

-1----1----1----1---|->

and then a second one starts, a bit later:

     -2----2--|->

The result of combining them by sum should be:

-1----3----3----1---|->

But the standard zip completes if any of the zipped streams end. I want this optional_zip to keep going even if one of the streams ends. Is there any way of doing this in Rx, or do I have to implement it myself by modifying the existing Zip?

note: I'm using RxPy, but the community here seems small and Rx operators seem to be pretty universal across languages, so I tagged it as rx-java and rx-js too.

uryga
  • 402
  • 4
  • 14
  • 1
    Is [combineLatest](http://reactivex.io/documentation/operators/combinelatest.html) what you're looking for? – Egor Neliuba Mar 05 '16 at 18:25
  • combineLatest will keep using the last value a stream emmited before ending, which for my example would result in -1----3----6----6----6-|->. I want the operator to ignore a stream if it ended, and only zip the remaining ones. – uryga Mar 06 '16 at 16:46
  • @uryga Can you explain better, what do you want to get? If you want to zip elements with sum selector then your diagram wrong, result would be -(1+2+3)---(1+2+3)---(1+3)--(1)--| – Oles Savluk Mar 06 '16 at 19:03
  • I think I cleared it up a bit. – uryga Mar 06 '16 at 19:22
  • @uryga but chance that two (or more) observables will emit values at exactly the same moment is 0 (or you are running several threads). So do you want to combine elements by index (as zip operator does), or by time when they were emitted? – Oles Savluk Mar 06 '16 at 21:13
  • By index, I want to avoid using time-based solutions if possible because I feel they're more brittle in this case – uryga Mar 06 '16 at 21:16
  • Then you should correct your diagram. Now, after you have edited your question, result will be -(1+2)---(1+2)---(1)---(1)--|->. – Oles Savluk Mar 06 '16 at 21:29
  • i know what you mean, but I'm looking for something that does what I described on the diagram... It should combine them by index only if theres two streams running, but if theres only one (as it is at the beginning) it should be just that stream. I know what I just wrote is a mess.I'm gonna go now and come back when I figure out how to formulate what I actually need. – uryga Mar 06 '16 at 21:42
  • To clarify, are you after something that operates on an observable of observables of x, and yields an observable of x that is produced by zipping all non-complete streams and aggregating values (by, e.g. summing them)? – Matt Burnell Mar 09 '16 at 04:01
  • @Matt Burnell yes! exactly. – uryga Mar 09 '16 at 08:06

2 Answers2

2

I'd tackle this problem by breaking it into two parts. First, I'd want something that takes an Observable<Observable<T>> and produces an Observable<Observable<T>[]> where the array contains only the "active" (i.e. non-complete) observables. Any time a new element is added to the outer observable, and any time one of the inner observables completes, a new array would be emitted containing the appropriate observables. This is essentially a "scan" reduction of the primary stream.

Once you've got something that can do that, you can use flatMapLatest and zip to get what you want.

My basic attempt at the first part is as follows:

function active(ss$) {
    const activeStreams = new Rx.Subject();
    const elements = [];
    const subscriptions = [];

    ss$.subscribe(s => {
        var include = true;
        const subscription = s.subscribe(x => {}, x => {}, x => {
            include = false;
            const i = elements.indexOf(s);
            if (i > -1) {
                elements.splice(i, 1);
                activeStreams.onNext(elements.slice());
            }
        });

        if (include) {
            elements.push(s);
            subscriptions.push(subscription);
            activeStreams.onNext(elements.slice());
        }   
    });

    return Rx.Observable.using(        
        () => new Rx.Disposable(() => subscriptions.forEach(x => x.dispose())),
        () => activeStreams
    );
}

From there, you'd just zip it and flatten it out like so:

const zipped = active(c$).flatMapLatest(x =>
    x.length === 0 ? Rx.Observable.never()
  : x.length === 1 ? x[0]
  : Rx.Observable.zip(x, (...args) => args.reduce((a, c) => a + c))
);

I've made the assumptions that zero active streams should yield nothing, one active stream should yield its own elements, and two or more streams should all zip together (all of which is reflected in the map application).

My (admittedly fairly limited) testing has this combination yielding the results you were after.

Great question, by the way. I've not seen anything that solves the first part of the problem (though I'm by no means an Rx expert; if someone knows of something that already does this, please post details).

Matt Burnell
  • 2,646
  • 1
  • 22
  • 23
  • i had a similar idea and already implemented that first `select_running` function you're talking about, but it's not pretty: pastebin.com/NdXJZeJL – uryga Mar 10 '16 at 08:37
  • as of now, it publishes an empty array every time there's no currently running streams, but that can be removed with one `if`. I can explain what's going on if you want, especially because there's 3 nested functions there... also, that's how half of the RxPy implementation looks like. – uryga Mar 10 '16 at 08:41
  • Answer updated. I took a look at your RxPy implementation and it looked similar in nature to my RxJs one. I'm not sure how to make this much prettier, since you have to track both the streams and your own subscriptions to them for proper cleanup. I'm not mutating anything that escapes the function, though. – Matt Burnell Mar 10 '16 at 08:52
  • I considered using a Subject too, but I went with how the built-ins were implemented - a subscription function that receives an Observer. that way I can write to the observer directly, without an intermediate Subject. then I turn that function into an AnonymousObservable, which simply takes the subscription function i gave it and applies it to any future subscribers. (explaining this because it took me a whole lot of time to actually figure out how the barely documented RxPy works) – uryga Mar 10 '16 at 09:02
  • Makes sense; I'm not yet very familiar with the innards of Rx, so I wrote it just using "public" constructs. I imagine your way is a bit faster. The subject is ok to demonstrate the concept, though. – Matt Burnell Mar 10 '16 at 09:11
  • thanks. i wish Python had arrow functions, I feel like it fits Rx really well - the nested function/closure definitions necessary to implement anything are pretty confusing, especially to newcomers. – uryga Mar 10 '16 at 09:22
  • Hey Matt, I have one question - what's the Disposable for? for the life of me, I can't figure out why they're in Rx. – uryga Mar 12 '16 at 20:57
  • Rx uses instances of Disposable as unsubscription handles; Observable.subscribe returns an instance of Disposable that will, when disposed, stop any further items being processed by the subscription. When Rx was implemented in .NET, the creators opted to stick with the existing disposal mechanism (rather than, say, returning an unsubscription lambda or token), a very good decision in my opinion. Not only did they do that, they added a bunch of additional abstractions (CompositeDisposable, SerialDisposable, Disposable.empty, etc.) that make clean up logic easy to express declaratively. – Matt Burnell Mar 12 '16 at 22:27
  • In this particular case, I'm using "using" to tie the lifetime of a disposable to usage of the returned observable. When there are no more subscribers to this observable, the disposable will be disposed, which will run the lambda that spins through and cleans up any subscriptions taken to the inner observables. This is important because the life of these inner observables may be much longer than the outer one, in which case the subscriptions would continue to consume processing time and hold memory despite not doing anything useful. – Matt Burnell Mar 12 '16 at 22:36
1

So I got some code working that I think does most of what you need. Basically, I created a function zipAndContinue that will operate like zip, except it will continue emitting items as long as some of the underlying streams still have data to emit. This function has only been [briefly] tested with cold observables.

Also, corrections/enhancements/edits are welcome.

function zipAndContinue() {
    // Augment each observable so it ends with null
    const observables = Array.prototype.slice.call(arguments, 0).map(x => endWithNull(x));
    const combined$ = Rx.Observable.combineLatest(observables);

    // The first item from the combined stream is our first 'zipped' item
    const first$ = combined$.first();

    // We calculate subsequent 'zipped' item by only grabbing
    // the items from the buffer that have all of the required updated
    // items (remember, combineLatest emits each time any of the streams
    // updates).
    const subsequent$ = combined$
        .skip(1)
        .bufferWithCount(arguments.length)
        .flatMap(zipped)
        .filter(xs => !xs.every(x => x === null));

    // We return the concatenation of these two streams
    return first$.concat(subsequent$)
}

And here are the utility functions used:

function endWithNull(observable) {
    return Rx.Observable.create(observer => {
        return observable.subscribe({
            onNext: x => observer.onNext(x),
            onError: x => observer.onError(x),
            onCompleted: () => {
                observer.onNext(null);
                observer.onCompleted();
            }
        })
    })
}

function zipped(xs) {
    const nonNullCounts = xs.map(xs => xs.filter(x => x !== null).length);

    // The number of streams that are still emitting
    const stillEmitting = Math.max.apply(null, nonNullCounts);

    if (stillEmitting === 0) {
        return Rx.Observable.empty();
    }

    // Skip any intermittent results
    return Rx.Observable.from(xs).skip(stillEmitting - 1);
}

And here's sample usage:

const one$ = Rx.Observable.from([1, 2, 3, 4, 5, 6]);
const two$ = Rx.Observable.from(['one']);
const three$ = Rx.Observable.from(['a', 'b']);

zipAndContinue(one$, two$, three$)
    .subscribe(x => console.log(x));

// >> [ 1, 'one', 'a' ]
// >> [ 2, null, 'b' ]
// >> [ 3, null, null ]
// >> [ 4, null, null ]
// >> [ 5, null, null ]
// >> [ 6, null, null ]

And here's a js-fiddle (you can click Run and then open the console): https://jsfiddle.net/ptx4g6wd/

Calvin Belden
  • 3,114
  • 1
  • 19
  • 21
  • never thought i'd see null terminated observables ;) – uryga Mar 10 '16 at 00:34
  • lol yea; a little hacky. this is definitely going to break if any of your source observables emit null before they complete. – Calvin Belden Mar 10 '16 at 00:38
  • still, thanks. it took me a few days to figure out how to write my own rx operators because the RxPy docs are nonexistent. gonna try a slightly different approach, but inspired by yours. – uryga Mar 10 '16 at 00:48
  • check out Matt Burnell's answer, we discuss a different way of doing it over there. – uryga Mar 10 '16 at 09:27