0

When reading an intro to rxjs I came read the following and was a bit concerned

the second subscription will restart the sequence from the first value.

How does it start from the first value? Does it store all values in memory? That could be a real problem for me as I am using it in a worker/service that will stay running. So if it's holding on to them all than I'm headed for a massive blow up.

halfer
  • 19,824
  • 17
  • 99
  • 186
Raif
  • 8,641
  • 13
  • 45
  • 56

1 Answers1

1

Standard subscriptions do not buffer any values. Some operators (and subjects) do need to buffer some values to implement their behaviour (and that buffer can be unbounded) but that is a problem distinct from the hot vs. cold dichotomy.

The short explanation is that the (cold) source observable (the most upstream observable) knows how to generate its values but only does so when he has a subscriber. And he generates the same values for all subscribers. So there is no buffering, more like regeneration of values. For instance, Rx.Observable.range(1,10) knows which values it has to generate, and generate them anytime there is a subscriber. It does not keep a buffer with 1,2,3...10 in memory, just 1 and 10 and iterates in between to generate the values. Same goes for most of cold observables, they have a value generating function associated to them, and that function is reexecuted anew for each subscriber.

If you want to switch to a behaviour for observables in which they push their values as soon as they receive/generate them, to all existing subscribers at the moment of reception/generation, you have to convert your cold observable to a hot one.

For a more in-depth explanation, have a look at the illustrated subscription and data flows corresponding to hot and cold observables.

Community
  • 1
  • 1
user3743222
  • 18,345
  • 5
  • 69
  • 75
  • Ah, great explanation. Perhaps you can help me with my implementation then. I have a tcp connection to an eventstore that is pushing new values as they arrive, a message broker if you will. I am connecting to that with rx.Observable.fromEvent(eventstore.subscribeToAllFrom(), 'event'). I'm filtering that and mapping it then subscribing to it and handing the event to a subset of modules to further process. Is this the right pattern to use? – Raif Feb 05 '16 at 18:22
  • You will need to ask another question for that, and describe it accurately. Also, it would be nice if you upvote answers if they are useful, and accept them if they indeed answered your questions. It is useful for the next users with a similar problem who will stumble on this page from a google search to identify which of the answers is a good/better one, hence those important SO guidelines. Basically it is about making answers useful not only to you but to others too. Reviewing (very quickly) your SO history it seems you have pending a few answers to accept. – user3743222 Feb 05 '16 at 18:56
  • Hi, that sounds legit, I'm hesitant to ask an is my implementation correct question as they tend to be highly specific. As for marking correct and upvoting, I fully intent to mark your answer correct and upvote it. I was just waiting to hear back from you. And on my other question I was waiting to actually try the answer before accepting it, although I do believe it to be correct. I do very much appreciate your help. This is a complicated (to me) topic. Finally :) re my history, I looked at a couple of my question and I think if there is a correct answer I market. but i may have missed few – Raif Feb 05 '16 at 19:32
  • About your history, yes, no worries, I just really quickly looked. About your question, it would not be the first similar question on SO. So I encourage you to ask away (and post some code). If you leave it in the comment, it is very likely that only me will be aware of it, while if you ask it out loud, everybody can answer. – user3743222 Feb 06 '16 at 00:11
  • ah, and yes, wait to try an answer before accepting it. Maybe it does not work, so it is better to wait I think. – user3743222 Feb 06 '16 at 00:11