-1

I have two observable pipes. I need to run one after the other and compare two values equal or not. I tried the below code.This should work, when the first observable value emitted , it should go and take second obserbla value and should comapre it first return value.I need to some expert help , to refator this code better way.

   this.selectedUnitDetailModel$.pipe(shareReplayUntil(this.destroySub)).subscribe(
          (res: UnitDetail) =>{
              if(res.unitTwo){
                this.appStore.select(selectUnit).
                pipe(shareReplayUntil(this.destroySub)).subscribe(
                  (unitId: string) => {
                    if(unitId ===  res.unitTwo){
                      this.sameUnit = true;
                    }else{
                      this.sameUnit = false;
                    }
                  });
              }
          }
       );
uma
  • 1,477
  • 3
  • 32
  • 63
  • 1
    Try `this.selectedUnitDetailModel$.pipe(withLatestFrom(this.appStore.select(selectUnit)), map(([{unitTwo}, unitId]) => unitTwo === unitId), tap(console.table), shareReplayUntil(this.destroySub)).subscribe(sameUnit => {this.sameUnit = sameUnit;})`. – Aluan Haddad Sep 13 '20 at 06:01
  • 1
    what is `shareReplayUntil`? – Rafi Henig Sep 13 '20 at 06:12
  • @RafiHenig i create Subject and bined all pipe to it , when component destroy , I will unsubscirbe that Subject. actually it used to unsubcribed observable – uma Sep 13 '20 at 06:20

2 Answers2

1
this.selectedUnitDetailModel$.pipe(shareReplayUntil(this.destroySub),mergeMap(
          (res: UnitDetail) =>{
              if(res.unitTwo){
               return this.appStore.select(selectUnit).
                pipe(shareReplayUntil(this.destroySub),map(
                  (unitId: string) =>  unitId ===  res.unitTwo);
              }
          }
       ).subscribe({
        next: (sameUnit: boolean) => {
           //do something 
        }
       });
WannaCSharp
  • 1,898
  • 2
  • 13
  • 19
1

You don't need higher order operators since the observables this.selectedUnitDetailModel$ and this.appStore.select(selectUnit) are independent of each other. Instead you could use functions like forkJoin, combineLatest or zip to get the notifications from them in parallel.

You could find difference b/n these functions here.

Try the following

forkJoin(
  this.selectedUnitDetailModel$.pipe(take(1)),      // <-- complete on first emission
  this.appStore.select(selectUnit).pipe(take(1))    // <-- complete on first emission
).subscribe(
  ([res, unitId]) => this.sameUnit = res.unitTwo === unitId,
  (error) => console.log(error)                     // <-- handle error
);

forkJoin only emits when the source observables complete, so I've piped in take(1) to each observable. The forkJoin will now emit on the first emission of each observable and complete. So the need for your shareReplayUntil(this.destroySub) is mitigated.

However, if you need to keep the emission stream from the observables open, you could use combineLatest or zip instead. In this case, you could replace the take(1) with your ``shareReplayUntil(this.destroySub)`.

Update: continuous stream of this.selectedUnitDetailModel$ observable

Like I said before, you could use combineLatest instead of forkJoin to enable a continuous stream of data.

Try the following

import { Subject, combineLatest } from 'rxjs';
import { takeUntil } from 'rxjs/operators';

combineLatest(
  this.selectedUnitDetailModel$,
  this.appStore.select(selectUnit)
).pipe(
  takeUntil(this.destroySub)         // <-- replaced with `takeUntil` operator
).subscribe(
  ([res, unitId]) => this.sameUnit = res.unitTwo === unitId,
  (error) => console.log(error)                     // <-- handle error
);
ruth
  • 29,535
  • 4
  • 30
  • 57
  • Thank you ,I will try this. One thing need to know , ' this.selectedUnitDetailModel$' this pipe value is change, then it work in second time ? (think , this.selectedUnitDetailModel$ is List -> i will select one by one item , then need to rechack unit equual or not ) – uma Sep 13 '20 at 08:21
  • 1
    @uma: No it wont. Like I said before for continuous stream of data you could use `combineLatest` instead. I've updated the answer. Please check if the updated code works for you. – ruth Sep 13 '20 at 08:26
  • Michael , I tried second code.combineLatest -> given some deprcated sonar issue.. , and the line ([res:UnitDetail , unitID: string ] , places given compile error like, undefnde :( – uma Sep 13 '20 at 08:38
  • 1
    @uma: I'm not sure how this `shareReplayUntil` is implemented. Try to replace it with `takeUntil` operator. It does the same thing that you require: close the open subscription when the component is destroyed. I've updated the answer as well. – ruth Sep 13 '20 at 08:41
  • 1
    @uma: Also please remove the type definitions from the array `([res, unitId])`. I've updated the answer. – ruth Sep 13 '20 at 08:44
  • Thank you for your support , i will try to 'combineLatest ' further, now it will given compile error here 'res.unitTwo' ,i think that pipe may return {} ,undeined and object :( . The depreate issue also there. :( – uma Sep 13 '20 at 08:52