3

I'm trying to use an rxjs observable to delegate, but share, a piece of expensive work across the lifetime of an application.

Essentially, something like:

var work$ = Observable.create((o) => {
  const expensive = doSomethingExpensive();
  o.next(expensive);
  observer.complete();
})
.publishReplay(1)
.refCount();

Now, this works fine and does exactly what I want, except for one thing: if all subscribers unsubscribe, then when the next one subscribes, my expensive work happens again. I want to keep it.

now, I could use a subject, or I could remove the refCount() and use connect manually (and never disconnect). But that would make the expensive work happen the moment I connect, not the first time a subscriber tries to consume work$.

Essentially, I want something akin to refCount that only looks at the first subscription to connect, and never disconnect. A "lazy connect".

Is such a thing possible at all?

shados
  • 137
  • 8
  • Does it work to simply create the as a startWith() then chain a mapping operation to it (using the identity function) and let the consumers subscribe to the mapped output Observable instead of the actual source? – user268396 Jan 14 '17 at 22:58
  • This is documented behaviour, at least. It's kinda direty, but maybe you can add one subscriber that will basically make the observable keep a refcount of 1 or more, so it doesn't flush the cached piece of work. I went through the API, but I can't find another way to keep the cache without subscribers. – GolezTrol Jan 14 '17 at 23:21
  • 1
    So, I played with it a bit more, and this is not doing what I'd expect (but it is doing what I want!). [See this fiddle](https://jsfiddle.net/phoenixmatrix/metrezfd/) I'm not sure I understand. I would have expected refCount and the publishReplay operation to happen again (and to create a new connectable observable) from the source data every time subscribers hit 0. But this isn't happening. – shados Jan 15 '17 at 00:23

1 Answers1

4

How does publishReplay() actually work

It internally creates a ReplaySubject and makes it multicast compatible. The minimal replay value of ReplaySubject is 1 emission. This results in the following:

  • First subscription will trigger the publishReplay(1) to internally subscribe to the source stream and pipe all emissions through the ReplaySubject, effectively caching the last n(=1) emissions
  • If a second subscription is started while the source is still active the multicast() will connect us to the same replaySubject and we will receive all next emissions until the source stream completes.
  • If a subscription is started after the source is already completed the replaySubject has cached the last n emissions and it will only receive those before completing.

const source = Rx.Observable.from([1,2])
  .mergeMap(i => Rx.Observable.of('emission:'+i).delay(i * 100))
  .do(null,null,() => console.log('source stream completed'))
  .publishReplay(1)
  .refCount();

// two subscriptions which are both in time before the stream completes
source.subscribe(val => console.log(`sub1:${val}`), null, () => console.log('sub1 completed'));
source.subscribe(val => console.log(`sub2:${val}`), null, () => console.log('sub2 completed'));

// new subscription after the stream has completed already
setTimeout(() => {
  source.subscribe(val => console.log(`sub_late-to-the-party:${val}`), null, () => console.log('sub_late-to-the-party completed'));
}, 500);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.3/Rx.js"></script>
Mark van Straten
  • 9,287
  • 3
  • 38
  • 57