5

I'm trying to understand how the stream is transmitted through the pipe in RXjs.
I know that this should not be a concern because that's the whole idea with async streams - but still there's something I want to understand.

Looking at this code :

var source = Rx.Observable
    .range(1, 3)
    .flatMapLatest(function (x) {  //`switch` these days...
        return Rx.Observable.range(x*100, 2);
    });


 source.subscribe(value => console.log('I got a value ', value))

Result :

I got a value 100
I got a value 200
I got a value 300
I got a value 301

I believe (IIUC) that the diagram is something like this : (notice striked 101,201 which are unsubscribed)

----1---------2------------------3------------------------------|

░░░░░░░░flatMapLatest(x=>Rx.Observable.range(x*100, 2))░░░░░░░░
-----100-------(-̶1̶0̶1̶)-------200---(-̶2̶0̶1̶)-----300------301-------------

And here is the question:

Question:

Is it always guaranteed that 2 will arrive before the (101) ? same as that 3 is arriving before (201) ?

I mean - if I'm not suppose to look at a time line so it is perfectly legal for the following diagram to occur :

----1---------------2---------------3------------------------------|

░░░░░░░░flatMapLatest(x=>Rx.Observable.range(x*100, 2))░░░░░░░░
-----100-------101------200---201-----300------301-------------

Where 2 arrived with a slight delay where 101 was already emitted

What am I missing here? How does the pipe work here ?

Royi Namir
  • 144,742
  • 138
  • 468
  • 792

2 Answers2

1

I suppose you already understand the 'pipe' as you call it. In any case, it is still good to review here how the data flows down the chain of subscriptions : Hot and Cold observables : are there 'hot' and 'cold' operators?

What is not addressed in this answer is the scheduling of the dataflow. Data are indeed emitted sequentially, that is part of the contract. However the timing of the emission of the data is defined by the scheduler that is used for that observable. There are a number of sensible defaults for each operator, which makes it so that most of the time we dont even have to think about scheduling.

It is hard to know for sure what happens here, but the best guess is that that range emits all its values on the Rx.Scheduler.currentThread scheduler, which schedules work as soon as possible on the current thread..

Scheduler.Immediate will ensure the action is not scheduled, but rather executed immediately. Scheduler.CurrentThread ensures that the actions are performed on the thread that made the original call. This is different from Scheduler.Immediate, as CurrentThread will queue the action to be performed.

So :

  • 1 is emitted
  • flatMapLatest creates Rx.Observable.range(x*100, 2); observable and subscribes to it, which leads to the emission of 100, and the scheduling of the emission of 101.
  • Before that happens, 2 is emitted so the 101 is dropped
  • same with 3 but then there is no new values, so nothing prevents the 301 to be received on the stream end.

This can be observed in the following jsfiddle : http://jsfiddle.net/ukhtwwcz/

The question of exactly WHY it behaves like this in the details, I cant ascertain.

Community
  • 1
  • 1
user3743222
  • 18,345
  • 5
  • 69
  • 75
  • OK - if so - if it yields value synchronously , how does it change the overall picture ? – Royi Namir Apr 14 '17 at 21:38
  • alright, I checked the doc, apparently the scheduler is `currentThread`. But dont really know what as soon as possible mean. I guess that means that like promises, the value is emitted right after the end of the current tick and before the next tick. Not sure how to explain that. – user3743222 Apr 14 '17 at 21:44
  • I will ask it the other way : if someone gave you the code and expected from you to show the output - will it always be the first diagram ? i'm trying to undnerstand why my figure #2 - can't/doesn't happen – Royi Namir Apr 14 '17 at 21:47
  • yes, it will always be the same outputs for the same inputs – user3743222 Apr 14 '17 at 21:50
  • Sorry but I still don't understand you point #3. "_Before that happens, 2 is emitted so the 101 is dropped_". What about the situation where 2 will arrive AFTER 101 ? where does it say that 2 always come before 101 ? ( and if it has to do with sync , then how doe sync affects that) – Royi Namir Apr 14 '17 at 21:51
  • sorry I updated my answer, the scheduler is not `Immediate` as I thought it was, it is the `currentThread`, so the values are not emitted synchronously, they are queued apparently (i.e. scheduled to be processed). What seems to happen is that the `2` apparently is scheduled for processing before the `101`. – user3743222 Apr 14 '17 at 21:54
  • Your last line is actually describing my question : _What seems to happen is that the 2 apparently is scheduled for processing before the 101_ - which I'm trying to understand - by what rule ? – Royi Namir Apr 14 '17 at 21:59
  • Schedulers behaviour is the least documented part of rxjs. This might help : http://stackoverflow.com/questions/31015278/schedulers-immediate-vs-currentthread. Also a few members of the rxjs team hang around on so, so hopefully they will be able to provide a more understandable answer. – user3743222 Apr 14 '17 at 22:04
1

For this particular Observable chain with this particular RxJS version the order of emissions is going to always be the same.

As already mentioned, in RxJS 4 it uses the currentThread scheduler as you can see here: https://github.com/Reactive-Extensions/RxJS/blob/master/src/core/perf/operators/range.js#L39.
All schedulers (except the immediate from RxJS 4) are internally using some type of a queue so the order is always the same.

The order of events is very similar to what you showed in the diagram (... or at least I think it is):

  1. 1 is scheduled and emitted because it's the only action in the queue.
  2. 100 is scheduled. At this point there are no more action in the Scheduler's queue because 2 hasn't been scheduled yet. The RangeObservable schedules another emission recursively after it calls onNext(). This means that 100 is scheduled before 2.
  3. 2 is scheduled.
  4. 100 is emitted, 101 is scheduled
  5. 2 is emitted, 101 is disposed.
  6. ... and so on

Note that this behavior is different in RxJS 4 and RxJS 5.

In RxJS 5 most Observables and operators by default don't use any Scheduler (an obvious exception are Observables/operator that need to work with delays). So in RxJS 5 the RangeObservable won't schedule anything and start emitting values right away in a loop.

The same example in RxJS 5 will produce different result:

const source = Observable
  .range(1, 3)
  .switchMap(function (x) {
    return Observable.range(x * 100, 2);
  });

source.subscribe(value => console.log('I got a value ', value));

This will print the following:

I got a value  100
I got a value  101
I got a value  200
I got a value  201
I got a value  300
I got a value  301

However, this will change significantly if you add for example delay(0). The common sense suggests that this shouldn't do anything:

const source = Observable
  .range(1, 3)
  .switchMap(function (x) {
    return Observable.range(x * 100, 2).delay(0);
  });

source.subscribe(value => console.log('I got a value ', value));

Now only the inner RangeObservable is scheduled and disposed all over again several times which makes it emit only values from the very the last RangeObservable:

I got a value  300
I got a value  301
martin
  • 93,354
  • 25
  • 191
  • 226
  • Still I think I have a missing piece (between step `4` and `5`) : Who said that `2` is emitted BEFORE `101` ? – Royi Namir Apr 20 '17 at 20:52
  • @Royi `101` is never emitted. [It's just scheduled](https://github.com/Reactive-Extensions/RxJS/blob/master/src/modular/observable/range.js#L20) but later canceled because the `RangeObservable` is unsubscribed. – martin Apr 20 '17 at 20:57