4

I'm trying to subscribe to an observable and log it's value.

Flow:

this.myObservable$ = new Subject<string>();
this.myObservable$.next('hello world') //in (angular) service A 

wait a few seconds

let newObservable$ = this.myObservable$.asObservable().subscribe(message => console.log(message)); //subscribe in service B, nothing happens

But this didn't work since I subscribed too late. But even with a shareReplay this does not work.

let newObservable$ = this.myObservable$.asObservable()
      .pipe(
        shareReplay(1),
      )
      .subscribe(message => console.log(message)); //also nothing happens

Can anyone help me understand why I can't 'shareReplay' this observable and get the last emitted value?

Vincent
  • 6,058
  • 15
  • 52
  • 94
  • Can't you just use a `BehaviorSubject` or `ReplaySubject`? I think the `shareReplay` is only triggered when your Observable has a subscriber and further subscribers will receive the same value. – Fussel Feb 17 '21 at 16:52

2 Answers2

3

The reason shareReplay isn't working for you is because you are applying it to the derived observable, rather than the source.

You should apply it to the source:

this.mySubject$ = new Subject<string>();
this.myObservable$ = this.mySubject.asObservable.pipe(shareReplay(1));

Now, your new observable will get the replayed value:

let newObservable$ = this.myObservable$.subscribe(
   message => console.log(message)
);

In your case, you were applying the shareReplay in the definition of newObservable, which will not achieve what you wanted.

As others have mentioned, since you already have a subject as a source, you could simply use a ReplaySubject:

this.mySubject$ = new ReplaySubject<string>(1);

Normally, in the context of an Angular service, you would keep the Subject private on a service, and expose a readonly observable publicly to consumers. Something like this:

Service:

private subject$ = new Subject();
public readonly value$ = this.subject$.pipe(shareReplay()) // if you use .pipe(), you don't really need .asObservable()

Component:

data$ = this.service.value$;

Since the shareReplay is defined on the source (value$) observable, the component will always get the replayed emission.

BizzyBob
  • 12,309
  • 4
  • 27
  • 51
  • https://stackblitz.com/edit/rxjs-8tw2tq?devtoolsheight=60 I tried this but it also never logs to the console. Isn't this the same as your code? – Vincent Feb 17 '21 at 18:19
  • 1
    It is not the same. It actually has the same mistake! :-). Check out this [StackBlitz](https://stackblitz.com/edit/rxjs-sfxcet?file=index.ts) with a few notes. – BizzyBob Feb 17 '21 at 18:50
  • ty very much for explaining again. But if I understand correctly it than is impossible to ever receive the 'testing-1' value? – Vincent Feb 17 '21 at 18:55
  • It is possible, you just need to have a dedicated "source" observable to apply your `shareReplay` to. That is why I separated the `subject$` from the `myObservable$`, so that `myObservable$` would be the source, to which you would apply the `shareReplay()` operator. – BizzyBob Feb 17 '21 at 19:03
  • ok, derived2$ will receive values even though it subscribed after the source$.next emitted indeed… Hoowever for that you would first have to create a derived$ observable of source$ (which has the shareReplay). So if the source$ emits ("testing-1”) between the original and the derived$ observable (with the shareReplay), you can never catch that event right? – Vincent Feb 17 '21 at 19:16
  • Correct. Basically, the observable consumers actually use, needs to have the `shareReplay` applied. So you want to define the `derived$` observable immediately after your subject, so that there's no chance for any emissions from subject that the observable doesn't receive. – BizzyBob Feb 17 '21 at 19:20
  • Not sure if this will help or not, but here's a super basic example in an angular context: [StackBlitz](https://stackblitz.com/edit/angular-ivy-xzshrw?file=src/app/some.service.ts) – BizzyBob Feb 17 '21 at 19:22
  • In angular context, i have a component whichs subscribes onInit to a subject from a service. However that service already emitted while the component is still rendering ... soo looks like there is an issue with the design. – Vincent Feb 17 '21 at 19:24
  • 1
    The replay functionality would need to live in the service. So hopefully it is a simple change to modify the service. Either by using replay subject. Or exposing an observable that uses shareReplay. :-) – BizzyBob Feb 17 '21 at 19:34
  • This previous example works but I don't understand why, sincerelly. How could I to shareReplay the 'testing-1' value in this example? https://stackblitz.com/edit/rxjs-byc2hc?file=index.ts – Sergi Dote Teixidor Apr 23 '21 at 05:42
  • @SergiDoteTeixidor Late subscribers won't receive prior emissions. You can use [`ReplaySubject`](https://www.learnrxjs.io/learn-rxjs/subjects/replaysubject) instead of `Subject`. Here is a working [StackBlitz](https://stackblitz.com/edit/so-66246196?file=index.ts). – BizzyBob Apr 23 '21 at 06:28
  • 1
    Thank you, this one helped me. Suddenly `rxjs` subscriptions didn't work as it used to be even though there was no change in `package.json` neither that in the code (except moving few services around, however using `ReplaySubject` fixed the issue. – Dom Nov 27 '21 at 13:18
1

I see multiple issues here

  1. You could use asObservable method from a multicast like Subject to get the observable. See here for usage of asObservable from RxJS' core dev.

  2. In the assignment newObservable$ = this.myObservable$.subscribe(...) you're actually assigning the subscription rather than the observable.

  3. The issue at hand, shareReplay doesn't appear to work because it obviously isn't piped to the source initially but rather later. It'll work for the subsequent subscriptions.

Check the following illustration

var { Subject, interval, fromEvent, of } = rxjs;
var { shareReplay } = rxjs.operators;

// Scenario 1 (analogous to OP's)

var subSource1 = new Subject();
var myObservable1$ = subSource1.asObservable();
myObservable1$.subscribe(m => console.log('Scenario 1: Initial subscription'));
subSource1.next('Hello world');
newObservable1$ = myObservable1$.pipe(
  shareReplay(1),
)
newObservable1$.subscribe(m => console.log(`Scenario 1: ${m}`));

// Scenario 2

var subSource2 = new Subject();
var myObservable2$ = subSource2.asObservable().pipe(
  shareReplay(1),
);
myObservable2$.subscribe(m => console.log('Scenario 2: Initial subscription'));
subSource2.next('Hello world');
newObservable2$ = myObservable2$;
newObservable2$.subscribe(m => console.log(`Scenario 2: ${m}`));
<script src="https://unpkg.com/rxjs@6.4.0/bundles/rxjs.umd.min.js"></script>
ruth
  • 29,535
  • 4
  • 30
  • 57