0

The issue is in one angular component, I subscribe to two Observable (i.e. ObsA and ObsB) to get data from two BehaviorSubject, in ObsB.subscribe, I would like to build new object per each data item from ObsA.subscribe, this is when the concurrent nature of Observable gets in the way, basically, ObsB.subscribe seems to happen earlier than ObsA.subscribe, so there is always no ObsA's data to be used to build new objects in ObsB.subscribe. Below is the code that illustrate this:

In MyComponent.ts

ObsAData: any = null;
ObsBData: any = null;

ngOnInit() {
  ObsA.subscribe(dA => {
    this.ObsAData = dA;
  });

  ObsB.subscribe(dB => {
    if (!!dB) {
      //do work
      this.ObsBData = dB;
    } else {
      //no data from ObsB source, hence create new data here based on data from ObsA source
      this.CreateNewBData(this.ObsAData);
    }
  });}

The problem is at runtime, this.ObsAData is always null when it gets to the this.CreateNewBData call. In a sense, creating new ObsBData depends on the availability of ObsAData, so my natural instinct is to try to somehow "chain" these subscriptions such that the ObsB.subscribe happens after ObsA.subscribe, but i am not even sure that's logical as it totally runs against the concurrent nature of Observable. Any suggestions how to do this effectively? that is to ensure this.CreatenewBData always have non-null this.ObsAData as its argument AFTER this.ObsAData indeed is set to non-null data from this.ObsA source.

For Comment
  • 1,139
  • 4
  • 13
  • 25

2 Answers2

0

My preferred route is to use combineLatest in these situations.

// RxJS v6+
import { timer, combineLatest } from 'rxjs';

const timerOne$ = timer(1000, 4000);
const timerTwo$ = timer(2000, 4000);
const timerThree$ = timer(3000, 4000);

combineLatest(
  timerOne$,
  timerTwo$,
  timerThree$,
  // combineLatest also takes an optional projection function
  (one, two, three) => {
    return `Timer One (Proj) Latest: ${one}, 
              Timer Two (Proj) Latest: ${two}, 
              Timer Three (Proj) Latest: ${three}`;
  }
).subscribe(console.log);

( StackBlitz )

Notice that timerTwo and timerThree are undefined for the first call. So what you can do is now wrap your logic in a check:

// RxJS v6+
import { timer, combineLatest } from 'rxjs';

const timerOne$ = timer(1000, 4000);
const timerTwo$ = timer(2000, 4000);
const timerThree$ = timer(3000, 4000);

combineLatest(
  timerOne$,
  timerTwo$,
  timerThree$,
  // combineLatest also takes an optional projection function
  (one, two, three) => {
    if (!!one && !!two && !!three) {
        return `Timer One (Proj) Latest: ${one}, 
              Timer Two (Proj) Latest: ${two}, 
              Timer Three (Proj) Latest: ${three}`;
    }
  }
).subscribe(console.log);
Z. Bagley
  • 8,942
  • 1
  • 40
  • 52
-1

Here's the work around for this. You will need to import first pipe for this.

When A sets ObsAData it will set a flag which is subscribed by B so that B knows whether ObsAData value is set? or get informed when value is set to call the CreateNewBData function.

    ObsAData: any = null;
    ObsBData: any = null;
    ADataSet$ = new BehaviourSubject<boolean>(false);
    
    ngOnInit() {
      ObsA.subscribe(dA => {
        this.ObsAData = dA;
        if (!this.AdataSet$.value) {
          this.AdataSet$.next(true) // set value so that second subscription would get noticed
        }
      });
    
      ObsB.subscribe(dB => {
        if (!!dB) {
          //do work
          this.ObsBData = dB;
        } else {
          if (!this.ObsAData) {
            this.ADataSet$.pipe(first(r => !!r)).subscribe(e => { // will execute once only when data received
              this.AdataSet$.next(false); // reset the flag
              this.CreateNewBData(this.ObsAData);
            })
          } else {
            this.CreateNewBData(this.ObsAData);
          }
        }
      });
    }
Ajeet Eppakayala
  • 1,186
  • 1
  • 10
  • 17
  • This is exactly the kind of thing you don't ever want to do with async programming. This becomes an uncontrollable mess. – Ingo Bürk Aug 22 '20 at 08:56
  • @IngoBürk I dont think there's anything uncontrollable here. I'd prefer this instead of a non-functioning code. If yuo could suggest an answer if you have any better solution? – Ajeet Eppakayala Aug 22 '20 at 09:04