8

TL;DR - I'm Looking for a way to control the number of HTTP requests concurrent connections to a REST API while I use RxJS.

My Node.js app will make a few thousand REST API calls to a third party provider. However, I know that if I make all those requests at once, the service might go down or reject my requests because of DDoS attack. So, I want to set the max number of concurrent connection at any given time. I used to implement concurrency control with Promises by leveraging Throat Package, but I haven't found a similar way to implement this.

I tried to use merge with 1 for concurrence as suggested in this post How to limit the concurrency of flatMap?, but all requests are sent at once.

Here's my code:

var Rx = require('rx'),
  rp = require('request-promise');

var array = ['https://httpbin.org/ip', 'https://httpbin.org/user-agent',
  'https://httpbin.org/delay/3',
  'https://httpbin.org/delay/3',
  'https://httpbin.org/delay/3'
];

var source = Rx.Observable.fromArray(array).map(httpGet).merge(1);

function httpGet(url) {
  return rp.get(url);
}

var results = [];
var subscription = source.subscribe(
  function (x) {
    console.log('=====', x, '======');
  },
  function (err) {
    console.log('Error: ' + err);
  },
  function () {
    console.log('Completed');
  });
Community
  • 1
  • 1
Diego
  • 1,678
  • 1
  • 16
  • 20
  • 2
    See http://stackoverflow.com/documentation/rxjs/8247/common-recipes/27973/sending-multiple-parallel-http-requests#t=201703261009146257815 – martin Mar 26 '17 at 10:09

3 Answers3

13

You can use the mergeMap operator to perform the HTTP requests and to flatten the responses into the composed observable. mergeMap takes an optional concurrent parameter with which you can specify the maximum number of concurrently subscribed observables (i.e. HTTP requests):

let source = Rx.Observable
  .fromArray(array)
  .mergeMap(httpGet, 1);

Note that a mergeMap with concurrent specified as 1 is equivalent to concatMap.

The reason the code in your question sends all of the requests at once is down to the calling of your httpGet function in the map operator. httpGet returns a Promise and promises are not lazy - as soon as httpGet is called, the request will be sent.

With the above code, the httpGet will only be called in the mergeMap implementation if there are fewer than the specified number of concurrent requests.

The code above will emit each response separately from the composed observable. If you want the responses combined into an array that is emitted when all requests have completed, you can use the toArray operator:

let source = Rx.Observable
  .fromArray(array)
  .mergeMap(httpGet, 1)
  .toArray();

You should also check out the recipes that Martin has referenced in his comment.

cartant
  • 57,105
  • 17
  • 163
  • 197
  • Thank you. Your answer gave me a few hints to figure out that my sample needed some tweaks. Also that rx != rxjs. rx seems to be dated and concurrency didn't work. Check examples above. – Diego Mar 26 '17 at 21:20
4

Rx.Observable.fromPromise may be useful in your case. Expanding on cartant's answer, try this, where concurrent is specified as 1:

Rx.Observable.from(array)
  .mergeMap(url => Rx.Observable.fromPromise(rp.get(url)), 1)
  .subscribe(x => console.log(x))

For time based control, this is what I can think of:

Rx.Observable.from(array)
  .bufferCount(2)
  .zip(Rx.Observable.timer(0, 1000), x => x)
  .mergeMap(x => Rx.Observable.from(x)
    .mergeMap(url => Rx.Observable.fromPromise(rp.get(url)))
  .subscribe(x => console.log(x))
user3587412
  • 1,053
  • 1
  • 12
  • 13
2

Thanks for the responses above. My issue had to do with using rx instead of rxjs NPM module. After I uninstalled rx and installed rxjs all examples started to use concurrency as expected. So, http concurrent calls with Promises, Callbacks, and Native Observables worked fine.

I'm posting them here in case anyone run into similar issues and can troubleshoot.

HTTP Request Callback-Based Sample:

var Rx = require('rxjs'),
  request = require('request'),
  request_rx = Rx.Observable.bindCallback(request.get);

var array = [
  'https://httpbin.org/ip', 
  'https://httpbin.org/user-agent',
  'https://httpbin.org/delay/3',
  'https://httpbin.org/delay/3',
  'https://httpbin.org/delay/3'
];

var source = Rx.Observable.from(array).mergeMap(httpGet, 1);

function httpGet(url) {
  return request_rx(url);
}

var subscription = source.subscribe(
  function (x, body) {
    console.log('=====', x[1].body, '======');
  },
  function (err) {
    console.log('Error: ' + err);
  },
  function () {
    console.log('Completed');
  });

Promised-Based Sample:

var Rx = require('rxjs'),
  rp = require('request-promise');

var array = ['https://httpbin.org/ip', 'https://httpbin.org/user-agent',
  'https://httpbin.org/delay/3',
  'https://httpbin.org/delay/3',
  'https://httpbin.org/delay/3'
];

var source = Rx.Observable.from(array).mergeMap(httpGet, 1);

function httpGet(url) {
  return rp.get(url);
}

var results = [];
var subscription = source.subscribe(
  function (x) {
    console.log('=====', x, '======');
  },
  function (err) {
    console.log('Error: ' + err);
  },
  function () {
    console.log('Completed');
  });

Native RxJS Sample:

var Rx = require('rxjs'),
  superagent = require('superagent'),
  Observable = require('rxjs').Observable;

var array = [
  'https://httpbin.org/ip', 
  'https://httpbin.org/user-agent',
  'https://httpbin.org/delay/10',
  'https://httpbin.org/delay/2',
  'https://httpbin.org/delay/2',
  'https://httpbin.org/delay/1',
];

let start = (new Date()).getTime();

var source = Rx.Observable.from(array)
    .mergeMap(httpGet, null, 1)
    .timestamp()
    .map(stamp => [stamp.timestamp - start, stamp.value]);

function httpGet(apiUrl) {
  return Observable.create((observer) => {
    superagent
        .get(apiUrl)
        .end((err, res) => {
            if (err) {
                return observer.onError(err);
            }
            let data,
                inspiration;
            data = JSON.parse(res.text);
            inspiration = data;
            observer.next(inspiration);
            observer.complete();
        });
    });
}

var subscription = source.subscribe(
  function (x) {
    console.log('=====', x, '======');
  });
Diego
  • 1,678
  • 1
  • 16
  • 20