I wish the answer was forkJoin
/ Promises.all
, but it's a bit more, please bear with me.
I have a source of promises, that can arrive in random order, and I need some way to say "when all promises that arrived so far are done, let me know".
In a Promise based solution, I thought initially of using Promise.all
but promises can be still "arriving" while others haven't completed. Interestingly there is a neat workaround for an "iterable Promise.all" at https://stackoverflow.com/a/37819138/239168
I'm trying to do this the Rx way. After reading the docs for a little, I think forkJoin
is the Promise.all
equivalent, however, same problem, there is no point in time where I can safely call forkJoin
or Promise.all
as there can always be one more added while another is still pending... Since I probably make no sense by now, I thought I'll ask for some guidance.
Setup
(hold your laughs if it's silly, I'm new to Rx...)
I have a Subject, and I want to know when all of the promises in it are complete... also it can always get new added promises at any time...
private promiseSource = new Subject<Promise<any>>();
promises$ = this.promiseSource.asObservable();
Every time a new promise "arrives", I'm just adding it to the subject
this.promiseSource.next(somePromise);
What I would like to magically happen is - have the subject "complete" whenever it holds only completed promises.
e.g.
promises$.magicFlatMapForkJoinConcatMapTrickery().subscribe({
next: x => ...,
error: err => ...,
complete: () => {
console.log('all promises we got so far are done');
// nice to have, I want this to keep "listening" for new promises
promiseSource.youAreNotREALYCompletePleaseReset();
}
});
Or in other words, I have an observable of async actions, if we take a look at the content, we can see overlapping async actions, I want to know when there is no overlap e.g.
|<-async action 1->| |<-async action 3->|
|<-async action 2->| |<-async action 4->|
/\ /\
find this gap
if these were for example http calls, I'm asking basically - tell me when there are no open http calls.
tl;dr
how to implement this Promises based answer in an RxJS world...