I have a source stream merged from two streams. When the source stream emit event I'd like to call a subscription function Meteor.subscribe
and keep it open, so I use mergeMap
. When subscription is ready I pipe to another mergeMap
to populate the data. It works well until I do 100 clicks and memory consumption is skyrockets. The question is, how is it possible to limit mergeMap, not to the first N subscriptions by concurrent: Number
, but to the N recent ones, like a sliding window?
function paginationCache$(): Observable<any> {
return merge(this.pageParamsChanged$, this.routerParamsChanged$)
.pipe(
mergeMap((newParams) => {
// First merge map subscribes to data and un subscribes when second merge map unsubscribes
return Observable.create((observer: Subscriber<any>) => {
let subscription = Meteor.subscribe('my/best/data/sub', newParams,
() => {
observer.next(subscription);
observer.complete();
});
});
}),
mergeMap((subscription: any) => {
// second subscription is just populating the data
return Observable.create((observer: Subscriber<Meteor.Error | any>) => {
const collection = new Mongo.Collection(subscription.collectionName);
const { selector, options } = this.mongoParams();
collection.find(selector, options).dataChanges((data) => {
observer.next({ data });
});
return () => {
subscription.stop();
};
});
})
);
}
I'd like to give more detailed explanation what happening in that code.
In my example, source stream (the merge
before pipe) it's never completes as long as I click button in my web interface, so it emits changes as I click next or previous button in my interface. First mergeMap
gets changes from the source stream and sends them to backend API (which also has conflicted naming publication/subscription). So when data available on the client I call observer.next(subscription)
to move to the second mergeMap
, but I can't destroy or stop meteor's subscription. Two reasons: 1. I'd like to get realtime changes to selected data, 2. if I stop meteor's subscription, data on the client side will be removed. So, now second mergeMap
it continuously updates selected data if it was updated on the server.
So after each UI button click (next, previous) I have new chain of subscriptions. It is okey if the original data table not big (1000 records) and I just clicked couple times. But, I can have more than that 30000 and I can click my buttons many times.
So, the idea is to make mergeMap like a limited size queue that holds just last N subscriptions, but queue is changing all the time I click the button.
LAST EDIT: working code:
function paginationCache$(): Observable<any> {
const N = 3;
const subscriptionsSubject = new Subject();
return merge(this.pageParamsChanged$, this.routerParamsChanged$)
.pipe(
mergeMap((newParams) => {
// First merge map subscribes to data and un subscribes when second merge map unsubscribes
subscriptionsSubject.next();
return Observable.create((observer: Subscriber<any>) => {
let subscription = Meteor.subscribe('mu/best/data/sub', newParams,
() => {
observer.next(subscription);
observer.complete();
});
});
}),
mergeMap((subscription: any) => {
// second subscription is just populating the data
return Observable.create((observer: Subscriber<Meteor.Error | any>) => {
const collection = new Mongo.Collection(subscription.collectionName);
const { selector, options } = this.mongoParams();
collection.find(selector, options).dataChanges((data) => {
observer.next({ data });
});
return () => {
subscription.stop();
};
}).pipe(
takeUntil(subscriptionsSubject
.pipe(
take(N),
filter((_, idx) => idx === N - 1)
)
)
);
})
);
}