2

I have a source stream merged from two streams. When the source stream emit event I'd like to call a subscription function Meteor.subscribe and keep it open, so I use mergeMap. When subscription is ready I pipe to another mergeMap to populate the data. It works well until I do 100 clicks and memory consumption is skyrockets. The question is, how is it possible to limit mergeMap, not to the first N subscriptions by concurrent: Number, but to the N recent ones, like a sliding window?

function paginationCache$(): Observable<any> {

    return merge(this.pageParamsChanged$, this.routerParamsChanged$)
        .pipe(
            mergeMap((newParams) => {
                // First merge map subscribes to data and un subscribes when second merge map unsubscribes
                return Observable.create((observer: Subscriber<any>) => {

                    let subscription = Meteor.subscribe('my/best/data/sub', newParams,
                        () => {
                            observer.next(subscription);
                            observer.complete();
                        });
                });
            }),
            mergeMap((subscription: any) => {
                // second subscription is just populating the data

                return Observable.create((observer: Subscriber<Meteor.Error | any>) => {

                    const collection = new Mongo.Collection(subscription.collectionName);

                    const { selector, options } = this.mongoParams();

                    collection.find(selector, options).dataChanges((data) => {
                        observer.next({ data });
                    });

                    return () => {
                        subscription.stop();
                    };
                });

            })
        );
}

I'd like to give more detailed explanation what happening in that code.

In my example, source stream (the merge before pipe) it's never completes as long as I click button in my web interface, so it emits changes as I click next or previous button in my interface. First mergeMap gets changes from the source stream and sends them to backend API (which also has conflicted naming publication/subscription). So when data available on the client I call observer.next(subscription) to move to the second mergeMap, but I can't destroy or stop meteor's subscription. Two reasons: 1. I'd like to get realtime changes to selected data, 2. if I stop meteor's subscription, data on the client side will be removed. So, now second mergeMap it continuously updates selected data if it was updated on the server.

So after each UI button click (next, previous) I have new chain of subscriptions. It is okey if the original data table not big (1000 records) and I just clicked couple times. But, I can have more than that 30000 and I can click my buttons many times.

So, the idea is to make mergeMap like a limited size queue that holds just last N subscriptions, but queue is changing all the time I click the button.

LAST EDIT: working code:

function paginationCache$(): Observable<any> {
    const N = 3;
    const subscriptionsSubject = new Subject();
    return merge(this.pageParamsChanged$, this.routerParamsChanged$)
        .pipe(
            mergeMap((newParams) => {
                // First merge map subscribes to data and un subscribes when second merge map unsubscribes

                subscriptionsSubject.next();

                return Observable.create((observer: Subscriber<any>) => {

                    let subscription = Meteor.subscribe('mu/best/data/sub', newParams,
                        () => {
                            observer.next(subscription);
                            observer.complete();
                        });
                });
            }),
            mergeMap((subscription: any) => {
                // second subscription is just populating the data

                return Observable.create((observer: Subscriber<Meteor.Error | any>) => {

                    const collection = new Mongo.Collection(subscription.collectionName);
                    const { selector, options } = this.mongoParams();

                    collection.find(selector, options).dataChanges((data) => {
                        observer.next({ data });
                    });

                    return () => {
                        subscription.stop();
                    };
                }).pipe(
                    takeUntil(subscriptionsSubject
                        .pipe(
                            take(N),
                            filter((_, idx) => idx === N - 1)
                        )
                    )
                );
            })
        );
}

Andrey Kartashov
  • 1,368
  • 1
  • 12
  • 20

2 Answers2

1

Without considering your snippet, here's how I'd go about this:

not to the first N subscriptions by concurrent: Number, but to the N recent ones, like a sliding window

If I understood correctly, you'd want something like this(assuming N = 3):

N = 3

Crt             |  1 |  2 |  3 |
Subscriptions   | S1 | S2 | S3 |


When Crt = 4

Crt           | 2  | 3  |  4 |
Subscriptions | S2 | S3 | S4 |

If that's the case, here's how I'd solve it:

const subscriptionsSubject = new Subject();

src$.pipe(
  mergeMap(
    data => (new Observable(s => {/* ... */ subscriptionsSubject.next(null) /* Notify about a new subscription when it's the case */ }))
      .pipe(
        takeUntil(
          subscriptionsSubject.pipe(
            take(N), // After `N` subscriptions, it will complete
            filter((_, idx) => idx === N - 1) // Do not want to complete immediately, but only when exactly `N` subscriptions have been reached
          )
        )
      )
  )
)
Andrei Gătej
  • 11,116
  • 1
  • 14
  • 31
0

I have two ideas here:

  1. You're not completing the second inner Observable. I guess this shouldn't be the source of your problem but it's better to complete observers if you can:

    return () => {
      subscription.stop();
      observer.complete();
    };
    
  2. You can use bufferCount to make a sliding window of Observables and then subscribe to them with switchMap(). Something along these lines:

    import { of, range } from 'rxjs'; 
    import { map, bufferCount, switchMap, shareReplay, tap } from 'rxjs/operators';
    
    range(10)
      .pipe(
        // turn each value to an Observable
        // `refCount` is used so that when `switchMap` switches to a new window
        // it won't trigger resubscribe to its sources and make more requests.
        map(v => of(v).pipe(shareReplay({ refCount: false, bufferSize: 1 }))),
        bufferCount(3, 1),
        tap(console.log), // for debugging purposes only
        switchMap(sourcesArray => merge(...sourcesArray)),
      )
      .subscribe(console.log);
    

    Live demo: https://stackblitz.com/edit/rxjs-kuybbs?devtoolsheight=60

    I'm not completely sure this simulates your use-case but I tried to include also shareReplay so that it won't trigger multiple Meteor.subscribe calls for the same Observable. I'd have to have a working demo of your code to test it myself.

martin
  • 93,354
  • 25
  • 191
  • 226
  • O!. bufferCount - from documentation looks promising. End, I can't complete the second Observable. Meteor framework can bring liveChanges. if switchMap could hold not the latest but latest 3 :) – Andrey Kartashov Apr 21 '20 at 19:00
  • With `bufferCount(3, 1)` the sliding windows will overlap so one Observable will always appear in 3 consecutive windows. – martin Apr 21 '20 at 19:58
  • I played a bit. buffersCount overlap maybe not a problem I always can just use sourcesArray[2]. The problem that switchMap will unsubscribe and then resubscribe. I need it kind of continues. Like lets say first time button pressed data should be emitted. Second the same. Third the same. Forth the same, But first needs unsubscribe event. – Andrey Kartashov Apr 21 '20 at 20:09
  • That's why I added `shareReplay({ refCount: false })` so it should make only one subscription to its source and keep it subscribed even when `switchMap` unsubscribes and resubscribes. – martin Apr 23 '20 at 10:03
  • Your solution is overcomplicated, and brings some side effects like bufferCount has to buffer 3 events before releases them. So either starts with []. Your mapping maps to observable and then you use switch map and switch map will cancel and start again. The point is I have to process triplets not like continuously one by one. – Andrey Kartashov Apr 23 '20 at 18:02