-1

I am facing a RxJS issue.

My app is currently designed as follows: I have two different clients: ClientA & ClientB that subscribe to two different Observables: ObservableA & ObservableB.

Note that the app also mutates a variable called aVariable.

Here is the flow:

  1. ClientA subscribes to ObservableA.
  2. ClientB subscribes to ObservableB.
  3. ObservableB subscription read false from aVariable and completes.
  4. ObservableA subscription sets aVariable to true and completes (later than ObservableB).

Whereas what is really intended was for ObservableA's subscription to complete before ObservableB's so that ClientB would read true from aVariable... Or to put it another way, somehow ensure that ObservableB's subscription waits till the other subscription has completed.

I am not sure what RxJS construct to use in order to achieve what I want (I currently use plain Observables). I believe I need more than plain Observables here...

Can someone please help?

P.S. Note that aVariable is held in a ngrx store but I don't think that is relevant to this issue...

P.P.S. The above is a simplification of my real app.

balteo
  • 23,602
  • 63
  • 219
  • 412
  • Usually when you want to run asynchronous tasks in order you'd use `concat` or `concatMap`. See maybe similar question http://stackoverflow.com/questions/39566268/angular-2-rxjs-how-return-stream-of-objects-fetched-with-several-subsequent/39578646#39578646 and http://stackoverflow.com/questions/36713531/how-to-use-exhaustmap-in-reactivex-rxjs-5-in-typescript/39589408#39589408 – martin Sep 27 '16 at 20:03
  • Hi Martin. The only trouble is that those are different clients. Is is not possible to use concat therefore. – balteo Sep 27 '16 at 20:47
  • Can you give a code sample of what you mean of how fits together? I'm not clear on: What is `Client A`? What do you mean you complete the subscription? What you mean by a "plain" `Observable`. – paulpdaniels Sep 27 '16 at 22:27
  • I meant `ClientA` is located in another class that `ClientB`. By plain Observable I meant an Observable as opposed to all to the other constructs/classes that extend Observable such as ConnectedObservable etc. (http://reactivex.io/rxjs/). – balteo Sep 28 '16 at 16:20
  • By completing I mean: http://reactivex.io/rxjs/class/es6/Subscriber.js~Subscriber.html#instance-method-complete – balteo Sep 28 '16 at 16:24
  • @paulpdaniels: Does that answer your questions? Can I provide any other information? – balteo Sep 29 '16 at 15:27
  • @balteo does my proposed answer work for you? – Mark van Straten Jan 20 '17 at 20:04
  • 1
    @MarkvanStraten Thanks. Yes it does indeed. Sorry I had missed your reply. – balteo Jan 20 '17 at 20:09

1 Answers1

1

I think you can solve your problem with a intermediate Subject in which you emit a value when streamB gets subscribed to:

const completeStreamA = new Rx.Subject();

const streamA = Rx.Observable.never()
  .takeUntil(completeStreamA);

const streamB = Rx.Observable.of('aValueOnStreamB')
  .do(() => completeStreamA.next('complete stream A'));

//clientA subscribes immediately
streamA.subscribe(
  next => console.log('a->next->'+next),
  err => console.log('a->error->' + err.message),
  () => console.log('a->complete')
);

setTimeout(() => {
  //simulate later subscription by clientB
  streamB.subscribe(
    next => console.log('b->next->'+next),
    err => console.log('b->error->' + err.message),
    () => console.log('b->complete')
  );
}, 3 * 1000);
  
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.3/Rx.js"></script>

Only after the streamB gets subscribed to it will next a value into the completeStreamA subject which will complete streamA. The output of above code:

a->complete
b->next->aValueOnStreamB
b->complete
Mark van Straten
  • 9,287
  • 3
  • 38
  • 57