0

I want to create a queue which when filled up executes concurrently upto, say 3, items at most asynchronously.

I currently have the concurrency part sorted, but how do I an observable queue into which I can push observables and subscribe to the output.

Here's the code for static concurrency. This only lets 3 promises resolve at most at any time.

    import { defer, from } from 'rxjs';
    import { mergeAll } from 'rxjs/operators';

    async function getData(x: string) { // Returns original value after 1s
        return new Promise((resolve) => {
            setTimeout(() => resolve(x), 1000);
        });
    }

    const ids = [...Array(20).keys()]; // 1,2,3, ... 18,19,20

    const observables = ids.map((x) => defer(() => getData('John ' + x)));

    from(observables)
        .pipe(mergeAll(3))
        .subscribe((d) => {
            data.push({
                name: d as string
            });
        });

    let data = [
        {
            name: 'Jon start'
        }
    ];

Harsh
  • 31
  • 1
  • 8
  • 1
    I believe I've already answered what you're looking for here: https://stackoverflow.com/questions/59922545/rxjs-how-to-merge-observables-dynamically/59922635#59922635 – maxime1992 Jul 17 '23 at 07:49
  • 1
    See if you could use the snippet [here](https://stackoverflow.com/a/62872189/6513921). To suit to your use-case, you'd only need to replace the `from(this.urls)` in the snippet with a multicast observable eg. `queue$ = new ReplaySubject(1)`. You could then push your source observables as input to the the queue eg. `queue$.next(sourceObs$)`. Here's a [Stackblitz](https://stackblitz.com/edit/angular-ivy-t4mxbq?file=src%2Fapp%2Fapp.component.ts) you could play around. – ruth Jul 17 '23 at 07:51
  • Thanks guys! I've posted the solution I was able to come to. – Harsh Jul 22 '23 at 06:05

1 Answers1

0

Thanks to @ruth & @maxime1992 in the replies, I was able to arrive on the solution:

    async function getData(x: string) { // Returns original value after 1s
        return new Promise((resolve) => {
            setTimeout(() => resolve(x), 1000);
        });
    }

    const queue$: Subject<Observable<string | undefined>> = new Subject();
    // terminate$ is used as a stop signal
    const terminate$ = new Subject();

    // 3 is concurrency limit
    const throttledQueue$ = queue$.pipe(mergeAll(3));

    throttledQueue$
        .pipe(takeUntil(terminate$))
        .subscribe((data) => console.log('Combined Stream Output:', data));

    // To insert something into the queue:
    queue$.next(defer(() => getData('Jon Don'));

    // To unsubscribe and stop processing:
    terminate$.next(null);

Harsh
  • 31
  • 1
  • 8