-1

I am doing a bit of a cascade delete with multiple service calls. Some of the later subscriptions rely on previous subscriptions to finish. How can I guarantee an subscription finishes before moving onto my next code?

// Need to make sure this code completes
data.forEach(element => {
    this.myService.delete(element.id).subscribe();
});

// Before running this code
this.myService.getAll().subscribe(res => {
        res.data.forEach(element => {
            this.myService.delete(element.id).subscribe();
        });
    }
);
Blake Rivell
  • 13,105
  • 31
  • 115
  • 231
  • 2
    https://www.learnrxjs.io/operators/combination/forkjoin.html – JB Nizet Jun 26 '18 at 16:17
  • 2
    Possible duplicate of [RXJS Wait for all observables in an array to complete (or error)](https://stackoverflow.com/questions/41734921/rxjs-wait-for-all-observables-in-an-array-to-complete-or-error) – JB Nizet Jun 26 '18 at 16:18
  • @JBNizet After reviewing the posts I am not understanding how to implement the syntax in my scenario. – Blake Rivell Jun 26 '18 at 16:27
  • There is an example in the duplicate I linked to. But anyway... `const deletions = data.map(element => this.businessEventService.delete(element.id)); forkJoin(...deletions).pipe(switchMap(() => this.myService.getAll())).subscribe(...)` – JB Nizet Jun 26 '18 at 16:31
  • https://blog.angularindepth.com/practical-rxjs-in-the-wild-requests-with-concatmap-vs-mergemap-vs-forkjoin-11e5b2efe293 – Vikas Jun 26 '18 at 16:31
  • 1
    @JBNizet I think my problem to begin with is that I should be doing this work within my service. Is it even possible to do this in the component using two separate service calls where each service makes an http request and returns an observable? – Blake Rivell Jun 26 '18 at 17:06
  • @JBNizet I am pretty sure I found the answer to my question and it has nothing to do with what you guys were saying so I must have explained it wrong. I simple had to put the code I wanted to run after the subscription completed in the complete param of the initial subscription. – Blake Rivell Jun 26 '18 at 17:22
  • @BlakeRivell If you're using this many subscribes or doing work within your subscribes, you're using Rx well. Trust me on this - if you don't learn to use Rx correctly you're going to have a very **very** ***very*** bad time. If that's the case you'd be better off not using it at all (and so would everyone else who has to work with your code). – cwharris Jun 26 '18 at 18:13

1 Answers1

2

A Subscription has a singular purposes: disposing, but you have options:

  • If you want to subscribe to observables one after another, you can use concat.

  • If you want to subscribe to multiple observables at the same time and combine the last value of each, you can use forkJoin.

  • If you want to use yielded value of an observable in another observable, you can use flatMap.


import { forkJoin, interval, concat, of } from "rxjs";
import { first, flatMap } from "rxjs/operators";

var combinedIntervals =
    forkJoin(
        interval(1000).pipe(first()),
        interval(2500).pipe(first())
    ).pipe(
        flatMap(([a, b]) => of(`${a} and ${b}`))
    );

concat(
    combinedIntervals,
    of("after both intervals")
)
.subscribe(
    console.log.bind(console)
);


// 0 and 0
// after both intervals

For you specific case, you'd select your delete operations as observables and then forkJoin them.

var data = [];

var obsBatch1 = data.map(element => myService.delete(element.id));
var obsBatch2 =
    forkJoin(
        obsBatch1,
        elements => elements.map(
            element => myService.delete(element.id)
        )
    );

obsBatch2.subscribe();

This is rxjs@6 syntax. I leave rxjs@5 as an exercise.

cwharris
  • 17,835
  • 4
  • 44
  • 64
  • In a scenario where I need to call myService.GetProducts(), loop through and call myService.DeleteProduct() on each one. How can I guarantee all individual deletes were finished before proceeding in the code with another subscription? – Blake Rivell Jun 26 '18 at 17:46
  • 1
    Using the forkJoin function, as explained in the duplicate, in my comment, and in the link I gave you. – JB Nizet Jun 26 '18 at 17:48
  • 1
    map the items to delete operations as observables and `forkJoin` them. ` `forkJoin(items.map( – cwharris Jun 26 '18 at 17:48
  • @cwharris thank you so much. Then finally how do I move on to further subscriptions to other service functions that rely on all of the deletes to be processed? Basically saying: When all these deletes are finished, now call myService.DeleteSomethingElse(). I think you already answered this in the bottom of your post. forkJoin will make sure all Batch1 deletes are complete? – Blake Rivell Jun 26 '18 at 18:18
  • Each time you need to compute things concurrently, you'll use forkJoin. Each time you need to compute things serially, you'll use concat. Rx is an algebra, and algebra is useful. Algebra is hard to use if you don't know algebra. I'd suggest learning - within an isolated environment - what the Rx operators do and what you can do with them. Otherwise you're going to be constantly asking a tutor to help you cram for your algebra tests. Learning Rx is a good investment. The mindsets you learn from Rx will apply to all async and concurrent programming models. – cwharris Jun 26 '18 at 18:25
  • 1
    @BlakeRivell `forkJoin` = an observable which yields the last value from all it's observables. So yes, all of the observables must complete before `forkJoin` will yield, and `forkJoin` only yields once. As to whether `forkJoin` will yield when one or more of it's observables complete *but have not yielded any value* is dependent on which version of Rx you're using. Play with it to find out. – cwharris Jun 26 '18 at 18:30
  • @cwharris Got ya, I didn't realize it was that complicated. I am definitely going to read up on it thoroughly. I created another post related to this which I shouldn't have that explains my scenario better. Is there anyway you can take a look at the sudo code section of the post and touch up your answer in this post or answer in that post. https://stackoverflow.com/questions/51048759/calling-observables-synchronously-one-after-another-in-angular?noredirect=1#comment89089154_51048759 – Blake Rivell Jun 26 '18 at 18:32
  • 1
    @BlakeRivell An `Observable` can be thought of as a `Task`/`Promise` which can yield multiple results. Likewise a `Task`/`Promise` is just an `Observable` that returns a single result. In C# you can even `var one = await Observable.Return(1);` (it'll throw if it yields anything but a single value). `forkJoin`/`CombineLatest` is just `Task.WhenAll(...).Select(x => x.Result);`, and `concat`/`Concat` is just `foreach (var task in tasks) { await task; /* do something */ }. Except observable can yield multiple results, so that'll need to be accounted for. But if you need that you'll figure it out. – cwharris Jun 26 '18 at 18:39
  • @cwharris thank you for the awesome c# comparisons this will definitely help me out. I was to mark your answer as correct because it will probably help someone else, but is there anyway you can show me how you would write that sudo code given the services that I have in my other post. It will really help me understand how and when to use each function. Also, just so you know I am unable to modify my services that make the direct http requests. I am only able to work in the component. Also I am using rxjs5 – Blake Rivell Jun 26 '18 at 18:47
  • Let us [continue this discussion in chat](https://chat.stackoverflow.com/rooms/173839/discussion-between-blake-rivell-and-cwharris). – Blake Rivell Jun 26 '18 at 18:58