8

Consider using the zip operator to zip together two infinite Observables, one of which emits items twice as frequently as the other.
The current implementation is loss-less, i.e. if I keep these Observables emitting for an hour and then I switch between their emitting rates, the first Observable will eventually catch up with the other.
This will cause memory explosion at some point as the buffer grows larger and larger.
The same will happen if first observable will emit items for several hours and the second will emit one item at the end.

How do I achieve lossy behavior for this operator? I just want to emit anytime I get emissions from both streams and I don't care how many emissions from the faster stream I miss.

Clarifications:

  • Main problem I'm trying to solve here is memory explosion due to the loss-less nature of zip operator.
  • I want to emit anytime I get emissions from both streams even if both streams emit the same value every time

Example:

Stream1: 1 2    3 4    5 6 7                
Stream2:     10     20       30 40 50 60 70

Regular zip will produce the following output:

[1, 10]
[2, 20]
[3, 30]
[4, 40]
[5, 50]
[6, 60]
[7, 70]

const Observable = Rx.Observable;
const Subject = Rx.Subject;


const s1 = new Subject();
const s2 = new Subject();

Observable.zip(s1,s2).subscribe(console.log);

s1.next(1); s1.next(2); s2.next(10); s1.next(3); s1.next(4); s2.next(20); s1.next(5); s1.next(6); s1.next(7); s2.next(30); 
 
s2.next(40); s2.next(50); s2.next(60); s2.next(70); 
<script src="https://unpkg.com/@reactivex/rxjs@5.0.3/dist/global/Rx.js"></script>

The output I'd like it to produce:

[1, 10]
[3, 20]
[5, 30]

Explanation:
Lossy zip operator is zip with buffer size 1. That means it will only keep the first item from the stream that emitted first and will loose all the rest (items that arrive between first item and first emission from the second stream). So what happens in the example is the following: stream1 emits 1, lossy zip "remembers" it and ignores all the items on stream1 until stream2 emits. First emission of stream2 is 10 so stream1 looses 2. After mutual emission (the first emission of lossy zip) it starts over: "remember" 3, "loose" 4, emit [3,20]. Then start over: "remember" 5, "loose" 6 and 7, emit [5,30]. Then start over: "remember" 40, "loose" 50,60,70 and wait for the next item on stream1.

Example 2:

Stream1: 1 2 3 ... 100000000000
Stream2:                        a

Regular zip operator will explode the memory in this case.
I don't want it to.

Summary:
Essentially I expect the lossy zip operator to remember only the first value emitted by stream 1 after previous mutual emission and emit when stream 2 catches up with stream 1. And repeat.

JeB
  • 11,653
  • 10
  • 58
  • 87
  • Have a look at `combineLatest` – martin Oct 06 '17 at 09:31
  • `combineLatest` doesn't do what I want. It emits everytime **one** of the streams emits. I need it to emit everytime **both** of the streams emit. Basically I want `zip` operator with buffer size 1. – JeB Oct 06 '17 at 09:33

5 Answers5

10

The following will give you the desired behavior:

Observable.zip(s1.take(1), s2.take(1)).repeat()

In RxJs 5.5+ pipe syntax:

zip(s1.pipe(take(1)), s2.pipe(take(1))).pipe(repeat());

const s1 = new Rx.Subject();
const s2 = new Rx.Subject();

Rx.Observable.zip(s1.take(1), s2.take(1)).repeat()
    .subscribe(console.log);

s1.next(1); s1.next(2); s2.next(10); s1.next(3); s1.next(4); s2.next(20); s1.next(5); s1.next(6); s1.next(7); s2.next(30);  
s2.next(40); s2.next(50); s2.next(60); s2.next(70); 
<script src="https://unpkg.com/@reactivex/rxjs@5.0.3/dist/global/Rx.js"></script>

Explanation:

  • repeat operator (in its current implementation) resubscribes to the source observable upon the latter's completion, i.e. in this particular case it resubscribes to zip upon every mutual emission.
  • zip combines two observables and waits for both of them to emit. combineLatest will do as well, it doesn't really matter because of take(1)
  • take(1) actually takes care of memory explosion and defines lossy behavior

If you want to take the last instead of the first value from each stream upon mutual emission use this:

Observable.combineLatest(s1, s2).take(1).repeat()

In RxJs 5.5+ pipe syntax:

combineLatest(s1.pipe(take(1)), s2.pipe(take(1))).pipe(repeat());

const s1 = new Rx.Subject();
const s2 = new Rx.Subject();

Rx.Observable.combineLatest(s1,s2).take(1).repeat()
    .subscribe(console.log);

s1.next(1); s1.next(2); s2.next(10); s1.next(3); s1.next(4); s2.next(20); s1.next(5); s1.next(6); s1.next(7); s2.next(30);  
s2.next(40); s2.next(50); s2.next(60); s2.next(70); 
<script src="https://unpkg.com/@reactivex/rxjs@5.0.3/dist/global/Rx.js"></script>
JeB
  • 11,653
  • 10
  • 58
  • 87
  • Kindly note, your answer does not emit the `[7, 40]` pair, as stated in your question. Why is this? – Richard Matsen Oct 08 '17 at 19:12
  • Right, you're correct, it shouldn't emit the `[7, 40]` pair. If the operator emits first items in mutual emission the pair for `40` not yet on the stream at this point. I'll fix the example. – JeB Oct 08 '17 at 19:32
  • How does the [7,40] differ from the other pairs? – Richard Matsen Oct 08 '17 at 19:35
  • Lossy `zip` operator is `zip` with buffer size `1`. That means it will only keep the first item from the stream that emitted first and will loose all the rest (items that arrive between first item and first emission from the second stream). So what happens in the example is the following: `stream1` emits `1`, lossy `zip` "remembers" it and ignores all the items on `stream1` until `stream2` emits. First emission of `stream2` is `10` so `stream1` looses `2`. After mutual emission (the first emission of lossy `zip`) it starts over... – JeB Oct 08 '17 at 19:41
  • ... it will "remember" `3`, "loose" `4`, emit `[3,20]`. Then start over: "remember" `5`, "loose" `6` and `7`, emit `[5,30]`. Then start over: "remember" `40`, "loose" `50`,`60`,`70` and wait for next item on `stream1`. Have I missed something? – JeB Oct 08 '17 at 19:44
  • Thanks, that explains it. Might be useful to add that to the question. – Richard Matsen Oct 08 '17 at 19:50
  • 1
    Shouldn't the RxJS 5.5+ syntax for taking the last be combineLatest(s1, s2).pipe(take(1), repeat())? Taking one from each outer observable would have the same effect as taking one with zip. Therefore, I think combineLatest can be used in both cases. See the following. https://stackblitz.com/edit/rxjs-4jae8r – Trevor Karjanis Jul 07 '20 at 18:03
1

I think the following should take always the last value from each source Observable.

const source1 = Observable.interval(1000).publish();
const source2 = Observable.interval(300).publish();

source1.connect();
source2.connect();

Observable.defer(() => Observable.forkJoin(
        source1.takeUntil(source2.skipUntil(source1)),
        source2.takeUntil(source1.skipUntil(source2))
    ))
    .take(1)
    .repeat()
    .subscribe(console.log);

Live demo: http://jsbin.com/vawewew/11/edit?js,console

This prints:

[ 0, 2 ]
[ 1, 5 ]
[ 2, 8 ]
[ 3, 12 ]
[ 4, 15 ]
[ 5, 18 ]

You might need to turn source1 and source2 into hot Observables if they aren't already.

Edit:

The core part is source1.takeUntil(source2.skipUntil(source1)). This takes values from source1 until source2 emits. But at the same time it will ignore source1 until source2 emits at least one value :).

The forkJoin() Observable works waits until both sources complete while remembering last emission from each one of them.

Then we want to repeat the process and so we use take(1) to complete the chain and .repeat() to resubscribe immediately.

martin
  • 93,354
  • 25
  • 191
  • 226
  • That's a tricky one! Thanks! But won't it always emit the first combined emission? I mean `takeUntil` will only take the first value of each of the streams and `defer` will reset on every new subscriber. But what about single subscriber that wants to get all the combined emissions? And why do we need `take(1)` and `repeat` here? Won't it create an infinite stream of the first combined emission? – JeB Oct 06 '17 at 10:09
  • 1
    See the edit. If you don't want to start emitting from the beginning you need to turn the sources into hot Observables. – martin Oct 06 '17 at 10:27
  • 1
    Now it's almost clear. "Then we want to repeat the process and so we use `take(1)` to complete the chain and `.repeat()` to resubscribe immediately" - this also explains the `defer` operator. But why `repeat` will "resubscribe immediately"? According to the docs: "Generates an observable sequence that repeats the given element the specified number of times, using the specified scheduler to send out observer messages". How is it resubscribing? – JeB Oct 06 '17 at 10:33
  • 1
    I admit that's a really confusing description. It seems like it repeats previous emissions but that's not what it does. It in fact just resubscribes to its source Observable after receiving `complete` notification. You can see it here https://github.com/ReactiveX/rxjs/blob/master/src/operators/repeat.ts#L52 – martin Oct 06 '17 at 12:00
  • Well, either the description has to be corrected or it's implementation dependent and you actually cannot assume it works this way. Thanks for explanation, I learn a lot from your answers. – JeB Oct 06 '17 at 13:04
  • how about `defer(zip).take(1).repeat`? Won't it do the same thing with less operators? – JeB Oct 06 '17 at 15:25
  • 1
    @meltedspark It's not. It would always emit only the first item from each source Observable. With `forkJoin` it emits the last. – martin Oct 06 '17 at 15:30
  • Right. But if the only thing I care about is the fact of the emissions (I just want to know when both streams emitted), `zip` would do the job wouldn't it? – JeB Oct 06 '17 at 15:42
  • 1
    I think it would but I didn't test it – martin Oct 06 '17 at 15:43
  • Sticking to the original question, which states "lossy form of **zip** operator", wouldn't `defer(zip).take(1).repeat` be the right solution? While your solution is the lossy form of `forkJoin`? – JeB Oct 06 '17 at 16:07
  • I guess it would – martin Oct 06 '17 at 16:10
  • As I don't want to take the credit for your answer I suggest you to edit it to `zip` version, while mentioning that the correct version for `forkJoin` is the one you suggested and I'll accept your answer. Otherwise I'll have to answer with `zip` version although you're the one who found the solution. – JeB Oct 06 '17 at 16:15
  • I've updated the question with concrete examples, please see if you can update your answer so that I can accept it. – JeB Oct 08 '17 at 10:26
  • I've changed your solution a bit to fit the example described in the question: http://jsbin.com/zurezubejo/edit?js,console . It works but it looks like it's one emission behind - if I was expecting to get last value from each stream upon mutual emission, I'd expect to get `[2,10], [4,20], [7,30]`, but what I'm actually getting is just `[2,10], [4,20]`. I think though I came up with a simpler solution that works as expected (using `combineLatest` and `take(1)` inside `defer`). Check out my answer and let me know if you think there is a mistake. – JeB Oct 08 '17 at 14:06
  • One more thing. It seems that you don't really need `defer` here. `take(1)` completes the observable and `repeat` resubscribes. Checked it in my solution, it works without `defer`, no reason yours won't work. – JeB Oct 10 '17 at 07:59
1

This gives the sequence [ 0, 2 ] [ 1, 5 ] [ 2, 8 ] [ 3, 12 ] ...

const interval1 = Rx.Observable.interval(1000)
const interval2 = Rx.Observable.interval(300)

const combined = Rx.Observable.combineLatest(interval1, interval2);
const fresh = combined.scan((acc, x) => { 
    return x[0] === acc[0] || x[1] === acc[1] ? acc : x 
  })
  .distinctUntilChanged() //fresh ones only

fresh.subscribe(console.log);

with arguably fewer operators. Not sure how efficient it is though.
CodePen

For update #3,

Then you'd need a key for each source item.

// Simulated sources according to latest spec provided (update #3)
const source1 = Rx.Observable.from(['x','y','z'])
const source2 = Rx.Observable.from(['a','a','b','b','c'])

// Create keys for sources
let key1 = 0
let key2 = 0
const keyed1 = source1.map(x => [x, key1++])
const keyed2 = source2.map(x => [x, key2++])

const combined = Rx.Observable
  .combineLatest(keyed1, keyed2)
  .map(([keyed1, keyed2]) => [...keyed1, ...keyed2]) // to simplify scan below
combined.subscribe(console.log) // not the output, for illustration only
console.log('-------------------------------------')

const fresh = combined.scan((acc, x) => { 
    return x[1] === acc[1] || x[3] === acc[3] ? acc : x 
  })
  .distinctUntilChanged() //fresh ones only

const dekeyed = fresh
  .map(keyed => { return [keyed[0], keyed[2]] })
dekeyed.subscribe(console.log); // required output

This produces

["x", "a"]  
["y", "a"]  
["z", "b"]  

CodePen (refresh CodePen page after opening console, for better display)

Richard Matsen
  • 20,671
  • 3
  • 43
  • 77
  • I think this solution is better than mine. – martin Oct 06 '17 at 12:35
  • but inspired by your skipUntil() – Richard Matsen Oct 06 '17 at 12:39
  • This solves quite a different problem. Your solution won't emit every time both streams emitted. It will emit every time both streams have emitted values different from the previous ones. In my case I don't want `distinctUntilChanged`. I do want to emit values even they are the same. Let's just assume both Observables emit `1` every time. The first one emitted 10 times and the second emitted 5 times. I'd like to get 5 emissions without "remembering" the 5 redundant emissions. Hope it's clear now. – JeB Oct 06 '17 at 15:04
  • Added a clarification for this matter in the question. – JeB Oct 06 '17 at 15:10
  • Yep, now (after you added the key) it will work. The only problem is that it solves a problem for `forkJoin` operator (take the latest) and not for `zip` (take the first). It also becomes quite overwhelmed as compared to @martin's answer. I have updated the question with concrete examples. – JeB Oct 08 '17 at 11:13
0

You mention buffer size 1, wondering if zipping two ReplaySubjects with buffer size 1 will do it?

Richard Matsen
  • 20,671
  • 3
  • 43
  • 77
  • I cannot make assumptions on what kind of `Subject` the producer is. I may of course wrap the source `Observables` with `ReplaySubject` but it won't help at all. The purpose of `ReplaySubject` is quite different: essentially it's emitting to any observer all of the items that were emitted by the source `Observable`, regardless of when the observer subscribes. – JeB Oct 06 '17 at 09:53
  • 'essentially it's emitting to any observer all of the items that were emitted by the source Observable' - up to the limit of buffer size, no? So I'm thinking the rate of zipping would be that of the slowest source. I'd test it for you, but afraid it's late evening for me. – Richard Matsen Oct 06 '17 at 09:59
0

I'm adding another answer for clarity, as it comes after the accepted answer (but builds on my previous answer).

Forgive me if I've misunderstood, but I was expecting the solution to handle switching emission rates:

then I switch between their emitting rates,

The test supplied doesn't switch emission rate until after the first stream stops,

Stream1: 1 2    3 4    5 6 7                 
Stream2:     10     20    30 40 50 60 70

so I've tried another test

Stream1: 1 2      3 4     5 6
Stream2:    10 20    30 40   50 60

The test data for this stream is

s1.next(1); s1.next(2); s2.next(10); s2.next(20); s1.next(3); s1.next(4);
s2.next(30); s2.next(40); s1.next(5); s1.next(6);  s2.next(50); s2.next(60);

From my understanding, the accepted answer fails this test.
It outputs

[1, 10]
[3, 20]
[4, 30]
[5, 40]
[6, 50]

whereas I'd expect to see

[1, 10]
[3, 30]
[5, 50]

if the operator is to be symmetrical (commutative?)

Enhancing my previous answer

This solution is built from basic operators, so is arguably easier to understand. I can't speak to it's efficiency, perhaps will test that in another iteration.

const s1 = new Rx.Subject();
const s2 = new Rx.Subject();

const tagged1 = s1.map(x=>[x,1])
const tagged2 = s2.map(x=>[x,2])
const merged = tagged1.merge(tagged2)
const fresh = merged.scan((acc, x) => { 
    return x[1] === acc[1] ? acc : x 
  })
  .distinctUntilChanged() //fresh ones only
const dekeyed = fresh.map(keyed => keyed[0])
const paired = dekeyed.pairwise()
let index = 0
const sequenced = paired.map(x=>[x,index++])
const alternates = sequenced.filter(x => x[1] % 2 === 0)
const deindexed = alternates.map(x=>x[0])

or in more compact form if preferred

let index = 0
const output = 
  s1.map(x=>[x,1]).merge(s2.map(x=>[x,2])) // key by stream id
  .scan((acc, x) => { 
    return x[1] === acc[1] ? acc : x 
  })
  .distinctUntilChanged()       //fresh ones only
  .map(keyed => keyed[0])       // de-key
  .pairwise()                   // pair
  .map(x=>[x,index++])          // add a sequence no
  .filter(x => x[1] % 2 === 0)  // take even sequence
  .map(x=>x[0])                 // deindex

For testing, CodePen (refresh CodePen page after opening console, for better display)

Richard Matsen
  • 20,671
  • 3
  • 43
  • 77
  • Thanks for investing your time, but "switching between emission rates" is mentioned in the question just for emphasizing the current loss-less behavior. It is mentioned in the section where I'm describing the **problem**, not the question. I thought I stated pretty clearly what is my question exactly didn't I? The examples and clarifications are not clear enough? – JeB Oct 12 '17 at 04:54
  • Also it would be helpful if you explain why do you expect to see this output (just like I did in the example in the question). Because as for now the first output seems much more logical than the second one. – JeB Oct 12 '17 at 05:23
  • The thing is that lossy `zip` should emit every time both streams emit without remembering the items in between. In your example first time the two streams emit is when the first emitted 1, lost the 2 and the second emitted 10. This is fine. But then next time both of them emit is when the second emits 20 and the first emits 3. After each mutual emission it starts waiting for next mutual emission. Or didn't I get your point? – JeB Oct 12 '17 at 05:28
  • Well, it is your problem definition so I'm not going to argue against it _too_ much :). – Richard Matsen Oct 12 '17 at 09:50
  • My main perception is that (in the case of my new test), if stream1 emits 1 and drops 2, then for symmetry when stream2 takes the lead it will do likewise (emit it's first and drop all subsequent until the other stream emits. – Richard Matsen Oct 12 '17 at 09:51
  • I can see you're saying that 10 has been 'used' so now 20 is on the queue waiting for next stream1 item. This is also fair, but then should logic not dictate 4 be in the emission, since 3 has been 'used' so 4 is now on the queue waiting for the next stream2 item? – Richard Matsen Oct 12 '17 at 09:51
  • Correcting the last - I can see you're saying that 10 has been 'used' so now 20 is on the queue waiting for next stream1 item. This is fair, but it means that stream2 is not behaving exactly like stream1 in terms of the items it drops – Richard Matsen Oct 12 '17 at 09:58
  • I'm not sure about what you mean when you say "stream2 is not behaving exactly like stream1". What is the difference? As for me, it behaves just like plain `zip` but without memory function. It fires every time both streams emit and then starts over. Probably the easier way to think of what I want is that both streams memory is reset after mutual emission. – JeB Oct 12 '17 at 15:20