I'm working on a custom rxjs operator for extending the bufferTime
operator functionality.
BufferTime takes timespan and optionally max size of the buffer, in my case however, I need to reset buffer not only based on time & size but also on the additional trigger - routeChange.
I've created a custom operator to handle that, it takes timeSpan, maxSize and trigger$, creates observables for timeSpan buffer and size buffer, and finally merges those 3 together and pass as a single notifier for the buffer
operator.
Here is the source code:
function bufferFrom<T>(...triggers: Observable<any>[]): (source$: Observable<T>) => Observable<T[]> {
// restart Subject used for taking control over all triggers and resetting them on emit
const restart: Subject<void> = new ReplaySubject<void>(1);
// function that wraps trigger with restart Observable to take control over that
const withRestart: <R>(source: Observable<R>) => Observable<R> = <R>(source: Observable<R>) => restart.asObservable().pipe(switchMap(() => source));
// create Observable from all triggers connected with restart control
const bufferTrigger = merge(...triggers.map(withRestart));
// emit first reset to enable Observables initially
restart.next();
// return the source$ Observable with buffer, and reset applied after buffer is emitted
return source$ => source$.pipe(
buffer(bufferTrigger),
tap(() => restart.next()),
filter(items => items.length > 0),
);
}
export function bufferAll<T>(bufferTimeSpan: number, bufferSize: number, trigger$: Observable<any>): (source$: Observable<T>) => Observable<T[]> {
return (source$) => {
const size$ = source$.pipe(bufferCount(bufferSize));
const timeSpan$ = source$.pipe(bufferTime(bufferTimeSpan * 1000));
return source$.pipe(bufferFrom(size$, timeSpan$, trigger$));
};
}
The issue I have is that in Angular app using such operator in Service provided in root causes the app to be unstable which blocks rendering of the App.
Here is how I tried to run the code outside of the Angular Zone, but that is not working in my case. However, if I use a simple bufferTime
instead of my implementation it works fine with the code below.
this.zone.runOutsideAngular(() => {
const routeChanges$ = this.router.events.pipe(filter(event => event instanceof NavigationEnd));
this.dataSubject.asObservable().pipe(
// Send after 60 seconds or when 10 items in buffer or when route changes
bufferAll(60, 10, routeChanges$),
).subscribe((data) => {
this.zone.run(() => console.log(data));
});
});
I'm not sure why my operator blocks Angular, and bufferTime
doesn't, because they both are creating infinite Observables based on the time/intervals, which according to Angular docs https://angular.io/api/core/ApplicationRef#isstable-examples-and-caveats
could be a problem that blocks render and app stable state.
Any idea what could be possibly wrong in my implementation, what have I missed?
Added a short stackblitz reproduction
data.service.ts includes custom operator code as well as normal usage of bufferTime
or bufferCount
operators. When using bufferTime
or bufferCount
operators app is being stable, if commented out and switched to my operator is becoming permanently unstable.