2

I have the following RxJS code:

let items = Observable.of({
  val: "one",
  type: "string"
}, {
  val: "five",
  type: "string"
}, {
  val: 2,
  type: "integer"
}, {
  val: 10,
  type: "integer"
}, {
  val: 20,
  type: "integer"
});

items
  .groupBy(x => x.type)
  .subscribe(obs => {
    let head = obs.first();
    let tail = obs.skip(1);
    head
      .do(x => console.log('head == ', x))
      .concat(tail)
      .do(x => console.log('tail == ', x))
      .subscribe(x => console.log('sub == ', x));

  });

Here's the output:

head ==  Object {val: "one", type: "string"}
tail ==  Object {val: "one", type: "string"}
sub ==  Object {val: "one", type: "string"}

head ==  Object {val: 2, type: "integer"}
tail ==  Object {val: 2, type: "integer"}
sub ==  Object {val: 2, type: "integer"}
tail ==  Object {val: 20, type: "integer"}
sub ==  Object {val: 20, type: "integer"}

Why is the second item not getting emitted?

Here's a Plunk that demonstrates my problem.

Nathan Jones
  • 4,904
  • 9
  • 44
  • 70

1 Answers1

1

It is a bit hard to explain what is happening here, but there is no magic so let's see if I can find the words (see thereafter). Following that lengthy explanation is a proposed workaround.

Let's say observer is obs => { let head = obs.first();....

  • Observable.of(...) is a cold observable which will emit one by one the items... whenever an observer will subscribe (definition of cold observable).
  • groupBy is also a cold observable, but it creates hot observables which are your type groups. That means that when a items comes in, it is associated to a group observable and immediately emitted to whatever observer were there (definition of hot observable). If no observer, then the value is lost.
  • Your first subscribe(observer) leads to the following :
    • observer subscribes to groupBy which subscribes to Observable.of which hence emit the first item from items (val: "one", type: "string"). Let-s call it item_string_1.
    • groupBy creates the first group hot observable (call it obs_string), emits it to observer THEN emits the first item on that first group.
    • when obs_string is emitted, observer is executed (remember observers are executed every time their related observable emits). That means that item_string_1 will be emitted AFTER observer is executed.
    • First observer execution : Your subscribe in the observer leads to the following:
      • subscription of observer2 (x => console.log('sub == ', x)) to concat(tail)
      • concat(tail) subscribes immediately to head and DOES NOT subscribe to tail in this moment. It will subscribe to it only when head has completed. That's one of the key points.
      • head leads to subscription to first which leads to subscription to obs, which leads to... nothing as no value has been emitted by obs.
      • end of observer execution
    • then item_string_1 is emitted on obs_string. That leads to :
      • head emitting item_string_1, concat(tail) emitting item_string1 and all your console.log showing Object {val: "one", type: "string"}.
      • head completes
    • then item_string_2 is emitted on obs_string. That leads to :
      • concat(tail) subscribes to tail which subscribes to skip(1) which subscribes to obs (which is obs_string for the moment). That means that concat(tail) emits... nothing. It skips the first value of obs, first value counted from the moment it subscribed. That first value is item_string_2 which is the second value emitted. Conclusion : your second value is NEVER emitted.
    • then item_integer_1 is emitted. Same behaviour as item_string_1
    • then item_integer_2 is emitted. Same behaviour as item_string_2 (skipped)
    • then item_integer_3 is emitted. Now the skip(1) emits val: 20, type: "integer" which makes it way to the messages in console.log

Proposed workaround:

Not sure if I understand well, but if you want to do something specific depending on the index of the element, just use map(selector(value, index)) as follows :

items
  .groupBy(x => x.type)
  .subscribe(obs => obs.map(function(value, index){
                               return index == 0 ? process_first(value)
                                                 : process_others(value)
                            })
                       .subscribe(x => console.log('sub == ', x))
user3743222
  • 18,345
  • 5
  • 69
  • 75
  • Thanks for putting so much detail into this fantastic answer! I'm still a little unclear on a few things: 1) when `item_string_1` is emitted, could you elaborate on why `concat(tail)` emits `item_string_1`? 2) if i understand you correctly, the `obs` observer can be executed when a) the hot observer group (eg., `obs_string`) gets emitted, or a value from that group is emitted (`item_string_1`). How do I tell when `obs` will be `a` or `b`? I'm accepting your answer now, because your suggestion did help although I went with `concatMap` instead. – Nathan Jones Dec 30 '15 at 04:03
  • 1) The `obs1.concat(obs2)` observable will subscribe to `obs1`, emit the values from `obs1` till `obs1` completes, and then subscribe to `obs2` and emits the values from obs2. So as `obs1` (`head` here) emits `item_string_1 ` that's what the `concat` will emit – user3743222 Dec 30 '15 at 05:00
  • 2) No, `obs` is not an observer. An observer is a function, `obs` is an observable. Here `obs.subscribe(observer)` entails that the observer function will be executed every time obs emits a value. There are not two cases, only one. Here, `obs_string` leads to `observer` execution, the `item_string_1` is emitted on `obs_string` and leads to the execution of any observer that have subscribed to `obs_string`, here at the end of the chain it leads to executing the `console.log` – user3743222 Dec 30 '15 at 05:06
  • For the first and second items, `obs` is `obs_to_string`. For third, fourth and fifth item, `obs` is `obs_to_integer`. The complete chain of execution would be in all cases `obs->first->head->concat->console.log` – user3743222 Dec 30 '15 at 05:07
  • Lastly, `concatMap` also gives you access to inner and outer index information, so that can help achieve your goal too. Cf https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/concatmap.md – user3743222 Dec 30 '15 at 05:12
  • *typo in my comment starting with `For the first and second items`. Replace `obs->...` by `obs->first->concat->console.log`. I haven't included the `do` but of course they are also traversed. – user3743222 Dec 30 '15 at 05:14
  • I'm a little slow with rx programming, so please forgive me for not understanding. In your main answer you say: "groupBy creates the first group hot observable (call it obs_string), emits it to observer THEN emits the first item on that first group." let's just talk about those emissions. when the first group observable is emitted, does that get passed to the observer, THEN the first element of the `items` Observable gets emitted to the same observer? – Nathan Jones Dec 30 '15 at 05:31
  • is there a way to do a line-by-line walkthrough of what happens on each `observer` execution? i think seeing that for myself might help me understand what's really going on. – Nathan Jones Dec 30 '15 at 05:38
  • No problem, I found it hard too in my beginnings. There really are two different things happening. The first group observable is emitted and the observer is executed with that first group observable as a parameter (the observer is `obs => {let head... subscribe(observer2)`). At that time all that does is subscribing `observer2` to an observable derived from `obs` (chain is `obs->first->concat`). Then the first item is emitted by the first group observable. That first item goes down the chain `obs->first->concat` and executes `observer2` and the log message ends up printed. – user3743222 Dec 30 '15 at 05:39
  • To trace what is happening, do what you already did, put a `do` after every operator you use and observe what gets out. I hope to find some time tomorrow and draw you a marble. A drawing should be more efficient at explaining – user3743222 Dec 30 '15 at 05:41
  • I added more `do` statements to my original [Plunk](http://plnkr.co/edit/aqthwXUfH6oTwTKp9M56?p=preview). I only get `in obs` printed once, so the `groupBy` observable only emits once -- the first group observable. Where is the first item (`item_string_1`) emitted? – Nathan Jones Dec 30 '15 at 06:04
  • The groupBy emits one observable (there is only one group). THAT observable emits the items. You subscribed indirectly to that observable with your `console.log` observer. Ok I'll do a drawing later. You can go ahead and review the marbles here : http://reactivex.io/documentation/operators/groupby.html – user3743222 Dec 30 '15 at 06:17
  • ah, ok. I think I'm getting closer to understanding this. Thanks for your time. – Nathan Jones Dec 30 '15 at 06:21
  • Did you get a chance to make that drawing you mentioned? I hope you didn't misinterpret "getting closer to understanding" from my previous comment to mean I wouldn't like to see that drawing. – Nathan Jones Jan 05 '16 at 06:13
  • 1
    cf. http://stackoverflow.com/questions/32190445/hot-and-cold-observables-are-there-hot-and-cold-operators/34669444#34669444 (see addition). About the group thing, I could not do better than the marbles, I am afraid. – user3743222 Jan 08 '16 at 04:56