4

Let's say I want to I download 10,000 files. I can easily build a queue of those 10,000 files (happy to take advice if any of this can be done better),

import request from 'request-promise-native';
import {from} from 'rxjs';

let reqs = [];
for ( let i = 0; i < 10000; i++ ) {
  reqs.push(
    from(request(`http://bleh.com/${i}`))
  )
};

Now I have an array of Rx.JS observable I've created from promises that represent my queue. Now for the behavior of what I want, I want to issue

  • Three-concurrent requests to the server
  • Upon completion of a request, I would like a new request to fire.

I can create a solution to this problem, but in light of things like the Rxjs queue, which I've never used I'm wondering what the right-most Rxjs way to do this is.

Evan Carroll
  • 78,363
  • 46
  • 261
  • 468
  • 1
    Have a look at https://github.com/cartant/rxjs-etc/blob/master/source/observable/forkJoinConcurrent.ts – cartant Jan 18 '19 at 00:59
  • @cartant That code is really awesome, could you annotate a bit and explain what's going on -- if not on the Github repo, here? I think I could learn from it. – Evan Carroll Jan 18 '19 at 02:05

2 Answers2

9

It sounds like you want an equivalent of forkJoin that supports a caller-specified maximum number of concurrent subscriptions.

It's possible to re-implement forkJoin using mergeMap and to expose the concurrent parameter, like this:

import { from, Observable } from "rxjs";
import { last, map, mergeMap, toArray } from "rxjs/operators";

export function forkJoinConcurrent<T>(
  observables: Observable<T>[],
  concurrent: number
): Observable<T[]> {
  // Convert the array of observables to a higher-order observable:
  return from(observables).pipe(
    // Merge each of the observables in the higher-order observable
    // into a single stream:
    mergeMap((observable, observableIndex) => observable.pipe(
      // Like forkJoin, we're interested only in the last value:
      last(),
      // Combine the value with the index so that the stream of merged
      // values - which could be in any order - can be sorted to match
      // the order of the source observables:
      map(value => ({ index: observableIndex, value }))
    ), concurrent),
    // Convert the stream of last values to an array:
    toArray(),
    // Sort the array of value/index pairs by index - so the value
    // indices correspond to the source observable indices and then
    // map the pair to the value:
    map(pairs => pairs.sort((l, r) => l.index - r.index).map(pair => pair.value))
  );
}
cartant
  • 57,105
  • 17
  • 163
  • 197
2

I am coming across this same problem in 2021 and was able to leverage @cartant's answer so I thought I'd share:

index.ts

import request from 'request-promise-native';
import { from, defer } from "rxjs";
import { forkJoinConcurrent } from './forkJoinConcurrent';

const handleRequest = async (id: string) => await request.get(`http://bleh.com/${id}`, { json: true });

const ids: string[] = [...Array(10000).keys()].map((k: number) => k.toString());

const concurrent: number = 3;

/* use `defer` instead of `from` to generate the Observables. 
 `defer` uses a factory to generate the promise and it will execute 
 the factory only when it is subscribed to */

const observables = ids.map((id: string) => defer(() => from(handleRequest(id))))

forkJoinConcurrent<any>(observables, concurrent).subscribe(value => console.log(value));

forkJoinConcurrent.ts

import { from, Observable } from "rxjs";
import { last, map, mergeMap, toArray } from "rxjs/operators";

export function forkJoinConcurrent<T>(
  observables: Observable<T>[],
  concurrent: number
): Observable<T[]> {
  // Convert the array of observables to a higher-order observable:
  return from(observables).pipe(
    // Merge each of the observables in the higher-order observable
    // into a single stream:
    mergeMap((observable, observableIndex) => observable.pipe(
      // Like forkJoin, we're interested only in the last value:
      last(),
      // Combine the value with the index so that the stream of merged
      // values - which could be in any order - can be sorted to match
      // the order of the source observables:
      map(value => ({ index: observableIndex, value }))
    ), concurrent),
    // Convert the stream of last values to an array:
    toArray(),
    // Sort the array of value/index pairs by index - so the value
    // indices correspond to the source observable indices and then
    // map the pair to the value:
    map(pairs => pairs.sort((l, r) => l.index - r.index).map(pair => pair.value))
  );
}
Andrew Allison
  • 1,122
  • 2
  • 13
  • 30