I have an Observable where each new value should cause an HTTP request. On the client-side I only care about the latest response value; however, I want every request to complete for monitoring/etc. purposes.
What I currently have is something like:
function simulate(x) {
// Simulate an HTTP request.
return of(x).pipe(delay(6));
}
source$.pipe(
someMapFunc(x => simulate(x)),
);
When I use switchMap
for the someMapFunc
, I get the right set of responses (only the latest). However, if the request is taking too long, it will get canceled.
When I use mergeMap
instead, I get the right set of requests (every request completes), but I get the wrong set of responses (every single one).
Is there a way to get the requests of mergeMap
with the responses of switchMap
? I know I can write this as a custom operator, but I'm wondering if I can build this out of existing/standard rxjs operators. To summarize what I'm thinking of:
- a version of
switchMap
that doesn't unsubscribe when it switches; - a version of
mergeMap
that only emits values from the latest inner Observable.
Edit: Based on the accepted answer, I was able to get the following, which works:
function orderedMergeMap(project) {
return (s) => defer(() => {
let recent = 0;
return s.pipe(
mergeMap((data, idx) => {
recent = idx;
return project(data).pipe(filter(() => idx === recent));
})
);
});
}