6

I'm working on a custom rxjs operator for extending the bufferTime operator functionality. BufferTime takes timespan and optionally max size of the buffer, in my case however, I need to reset buffer not only based on time & size but also on the additional trigger - routeChange.

I've created a custom operator to handle that, it takes timeSpan, maxSize and trigger$, creates observables for timeSpan buffer and size buffer, and finally merges those 3 together and pass as a single notifier for the buffer operator.

Here is the source code:

function bufferFrom<T>(...triggers: Observable<any>[]): (source$: Observable<T>) => Observable<T[]> {
   // restart Subject used for taking control over all triggers and resetting them on emit
   const restart: Subject<void> = new ReplaySubject<void>(1);

   // function that wraps trigger with restart Observable to take control over that
   const withRestart: <R>(source: Observable<R>) => Observable<R> = <R>(source: Observable<R>) => restart.asObservable().pipe(switchMap(() => source));

   // create Observable from all triggers connected with restart control
   const bufferTrigger = merge(...triggers.map(withRestart));

   // emit first reset to enable Observables initially
   restart.next();
 
   // return the source$ Observable with buffer, and reset applied after buffer is emitted
   return source$ => source$.pipe(
      buffer(bufferTrigger),
      tap(() => restart.next()),
      filter(items => items.length > 0),
   );
}

export function bufferAll<T>(bufferTimeSpan: number, bufferSize: number, trigger$: Observable<any>): (source$: Observable<T>) => Observable<T[]> {
   return (source$) => {
      const size$ = source$.pipe(bufferCount(bufferSize));
      const timeSpan$ = source$.pipe(bufferTime(bufferTimeSpan * 1000));

      return source$.pipe(bufferFrom(size$, timeSpan$, trigger$));
   };
}

The issue I have is that in Angular app using such operator in Service provided in root causes the app to be unstable which blocks rendering of the App.

Here is how I tried to run the code outside of the Angular Zone, but that is not working in my case. However, if I use a simple bufferTime instead of my implementation it works fine with the code below.

 this.zone.runOutsideAngular(() => {
    const routeChanges$ = this.router.events.pipe(filter(event => event instanceof NavigationEnd));

    this.dataSubject.asObservable().pipe(
      // Send after 60 seconds or when 10 items in buffer or when route changes
      bufferAll(60, 10, routeChanges$),
    ).subscribe((data) => {
      this.zone.run(() => console.log(data));
    });
});

I'm not sure why my operator blocks Angular, and bufferTime doesn't, because they both are creating infinite Observables based on the time/intervals, which according to Angular docs https://angular.io/api/core/ApplicationRef#isstable-examples-and-caveats could be a problem that blocks render and app stable state.

Any idea what could be possibly wrong in my implementation, what have I missed?

Added a short stackblitz reproduction

data.service.ts includes custom operator code as well as normal usage of bufferTime or bufferCount operators. When using bufferTime or bufferCountoperators app is being stable, if commented out and switched to my operator is becoming permanently unstable.

Maciej Wojcik
  • 2,115
  • 2
  • 28
  • 47
  • Maybe this helps https://stackoverflow.com/a/50785257/19768317, By the way your restart ReplaySubject never complete. – Eddy Lin Sep 08 '22 at 15:34

0 Answers0