1

I'm confused by the behavior of a shared stream that is created using Rx.Observable.just.

For example:

var log = function(x) { console.log(x); };

var cold = Rx.Observable
  .just({ foo: 'cold' });

cold.subscribe(log); // <-- Logs three times
cold.subscribe(log);
cold.subscribe(log);

var coldShare = Rx.Observable
  .just({ foo: 'cold share' })
  .share();

coldShare.subscribe(log); // <-- Only logs once
coldShare.subscribe(log);
coldShare.subscribe(log);

Both streams only emit one event, but the un-shared one can be subscribed to three times. Why is this?

I need to "fork" a stream but share its value (and then combine the forked streams).

How can I share the value of a stream but also subscribe to it multiple times?

I realize that this is probably related to the concept of "cold" and "hot" observables. However:

  • Is the stream created by Rx.Observable.just() cold or hot?
  • How is one supposed to determine the answer to the previous question?
Paul Murray
  • 1,002
  • 1
  • 10
  • 24

2 Answers2

1

Is the stream created by Rx.Observable.just() cold or hot?

Cold.

How is one supposed to determine the answer to the previous question?

I guess the documentation is the only guide.

How can I share the value of a stream but also subscribe to it multiple times?

You are looking for the idea of a connectable observable. By example:

var log = function(x) { console.log(x); };
var coldShare = Rx.Observable
  .just({ foo: 'cold share' })
  .publish();

coldShare.subscribe(log); // Does nothing
coldShare.subscribe(log); // Does nothing
coldShare.subscribe(log); // Does nothing

coldShare.connect(); // Emits one value to its three subscribers (logs three times)

var log = function(x) {
  document.write(JSON.stringify(x));
  document.write("<br>");
};

var coldShare = Rx.Observable
  .just({ foo: 'cold share' })
  .publish();

coldShare.subscribe(log); // <-- Only logs once
coldShare.subscribe(log);
coldShare.subscribe(log);

coldShare.connect();
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/4.0.7/rx.all.min.js"></script>

The example above logs three times. Using publish and connect, you essentially "pause" the observable until the call to connect.

See also:

Community
  • 1
  • 1
Whymarrh
  • 13,139
  • 14
  • 57
  • 108
  • So, the truth is I already knew about `publish()` and `connect()`. However, what confuses me is that if coldShare is: `Rx.Observable.interval().take(1).share()` Then it *does* log three times, and you don't need to use `publish()` or `connect()`. Why is that? – Paul Murray Nov 14 '15 at 23:20
  • @PaulMurray [`share`](https://github.com/Reactive-Extensions/RxJS/blob/0d908f4cf0023fa1bba2788f5432b18bdd8857ce/doc/api/core/operators/share.md) is a special case of `publish` and `connect` – Whymarrh Nov 14 '15 at 23:36
  • So, is it true that `Rx.Observable.interval().take(1)` is hot and `Rx.Observable.just()` is cold? – Paul Murray Nov 14 '15 at 23:47
  • Yup, `Rx.Observable.interval().take(1)` is hot because `Rx.Observable.interval()` is hot (and because `.take(1)` doesn't change hot to cold) – Whymarrh Nov 14 '15 at 23:49
  • Er, that's not correct, `.interval()` is a cold observable. – Whymarrh Nov 14 '15 at 23:54
  • So, they're both cold? And I still don't understand how one determines if an observable is hot or cold. Reading [this documentation](https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/return.md) doesn't seem to help – there is never any mention of 'hot' or 'cold' anywhere in the documentation, so how do you know? (and I don't mean to sound snarky, it's an honest question) – Paul Murray Nov 15 '15 at 00:03
  • 1
    @PaulMurray you can likely assume that an observable is cold unless told otherwise. Hot vs. cold can be thought of as active vs passive (respectively) and most observables are passive in the sense that they don't do anything until someone subscribe. Examples of hot observables (where values are emitted regardless of whether or not someone is subscribed) include mouse events, external API updates, etc. – Whymarrh Nov 15 '15 at 00:32
0

I don-t understand your first question, but about the last one, as I have been having problem getting that one too:

  • Rxjs implementation of Observables/Observers is based on the observer pattern, which is similar to the good old callback mechanism.
  • To exemplify, here is the basic form of creating an observable (taken from the doc at https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/create.md)

    var source = Rx.Observable.create(function (observer) {
        observer.onNext(42);
        observer.onCompleted();
    
        // Note that this is optional, you do not have to return this if you require no cleanup
        return function () {
            console.log('disposed');
        };
    });
    
  • Rx.Observable.create takes as argument a function (say factory_fn to be original) which takes an observer. Your values are generated by a computation of your choice in the body of factory_fn, and because you have the observer in parameter you can process/push the generated values when you see fit. BUT factory_fn is not executed, it is just registered (like a callback would). It will be called everytime there is a subscribe(observer) on the related observable (i.e. the one returned by Rx.Observable.create(factory_fn).

  • Once subscription is done (creation callback called), values flow to your observer according to the logic in the factory function and it remains that way till your observable completes or the observer unsubscribes (supposing you did implement an action to cancel value flow as the return value of factory_fn).
  • What that basically means is by default, Rx.Observables are cold.
  • My conclusion after using quite a bit of the library, is that unless it is duely documented, the only way to know FOR SURE the temperature of an observable is to eye the source code. Or add a side effect somewhere, subscribe twice and see if the side effect happens twice or only once (which is what you did). That, or ask on stackoverflow.
  • For instance, Rx.fromEvent produce hot observables, as you can see from the last line in the code (return new EventObservable(element, eventName, selector).publish().refCount();). (code here : https://github.com/Reactive-Extensions/RxJS/blob/master/src/core/linq/observable/fromevent.js). The publish operator is among those operators which turns a cold observable into a hot one. How that works is out of scope so I won-t detail it here.
  • But Rx.DOM.fromWebSocket does not produce hot observables (https://github.com/Reactive-Extensions/RxJS-DOM/blob/master/src/dom/websocket.js). Cf. How to buffer stream using fromWebSocket Subject
  • Confusion often comes I think from the fact that we conflate the actual source (say stream of button clicks) and its representation (Rx.Observable). It is unfortunate when that happens but what we imagine as hot sources can end up being represented by a cold Rx.Observable.

So, yes, Rx.Observable.just creates cold observables.

Community
  • 1
  • 1
user3743222
  • 18,345
  • 5
  • 69
  • 75