8

I'm looking for a promise function wrapper that can limit / throttle when a given promise is running so that only a set number of that promise is running at a given time.

In the case below delayPromise should never run concurrently, they should all run one at a time in a first-come-first-serve order.

import Promise from 'bluebird'

function _delayPromise (seconds, str) {
  console.log(str)
  return Promise.delay(seconds)
}

let delayPromise = limitConcurrency(_delayPromise, 1)

async function a() {
  await delayPromise(100, "a:a")
  await delayPromise(100, "a:b")
  await delayPromise(100, "a:c")
}

async function b() {
  await delayPromise(100, "b:a")
  await delayPromise(100, "b:b")
  await delayPromise(100, "b:c")
}

a().then(() => console.log('done'))

b().then(() => console.log('done'))

Any ideas on how to get a queue like this set up?

I have a "debounce" function from the wonderful Benjamin Gruenbaum. I need to modify this to throttle a promise based on it's own execution and not the delay.

export function promiseDebounce (fn, delay, count) {
  let working = 0
  let queue = []
  function work () {
    if ((queue.length === 0) || (working === count)) return
    working++
    Promise.delay(delay).tap(function () { working-- }).then(work)
    var next = queue.shift()
    next[2](fn.apply(next[0], next[1]))
  }
  return function debounced () {
    var args = arguments
    return new Promise(function (resolve) {
      queue.push([this, args, resolve])
      if (working < count) work()
    }.bind(this))
  }
}
trincot
  • 317,000
  • 35
  • 244
  • 286
ThomasReggi
  • 55,053
  • 85
  • 237
  • 424
  • 2
    async.js and queue.js both support configurable concurrency. – nrabinowitz Aug 04 '16 at 23:07
  • For arrays or managing the state of a given function and it's instances? – ThomasReggi Aug 04 '16 at 23:08
  • @nrabinowitz both of those libs have nothing to do with promises. – ThomasReggi Aug 04 '16 at 23:09
  • Both of those libs have to do with managing async processes - it doesn't matter whether they're promise-based, though it's true that those libs are focused on callback-style functions. – nrabinowitz Aug 04 '16 at 23:10
  • @nrabinowitz: …which does make them pretty unusable with promises – Bergi Aug 04 '16 at 23:12
  • possible duplicate of [How can I limit Q promise concurrency?](http://stackoverflow.com/q/20546373/1048572) and similar ones? Btw, you should know that SO is not a place to ask for libraries, so please remove your last sentence. – Bergi Aug 04 '16 at 23:15
  • 2
    `when a given promise is running so that only a set number of that promise is running at a given time` - the code has 6 promises, each will run exactly once - concurrently, but a given promise is run exactly once - the question is poorly worded at best – Jaromanda X Aug 04 '16 at 23:38
  • i have a recent answer [here](http://stackoverflow.com/questions/38732588/bluebirdjs-promises-wrapped-inside-a-for-loop/38732804#38732804) that should work if you use `async.eachLimit` instead of `async.eachSeries` *edit* if you need to run it continuously you should be able to combine `async.until` and `async.parallelLimit` – Plato Aug 05 '16 at 00:23
  • 3
    Just to be clear here. Promises don't "run". A promise is proxy for the result of an async operation that is already running. – jfriend00 Aug 05 '16 at 00:52
  • @ThomasReggi: You can use my [`AsyncSemaphore` from here](http://blog.stephencleary.com/2012/02/async-and-await.html). – Stephen Cleary Aug 08 '16 at 16:00
  • 1
    `es6-promise-pool`? `promise-limit`? `cwait`? Not relevant, or not known? – Lee Goddard Nov 07 '16 at 07:46
  • Does this answer your question? [Throttle amount of promises open at a given time](https://stackoverflow.com/questions/38385419/throttle-amount-of-promises-open-at-a-given-time) – Jeff Bowman Aug 23 '21 at 21:27

5 Answers5

9

I don't think there are any libraries to do this, but it's actually quite simple to implement yourself:

function sequential(fn) { // limitConcurrency(fn, 1)
    let q = Promise.resolve();
    return function(x) {
        const p = q.then(() => fn(x));
        q = p.reflect();
        return p;
    };
}

For multiple concurrent requests it gets a little trickier, but can be done as well.

function limitConcurrency(fn, n) {
    if (n == 1) return sequential(fn); // optimisation
    let q = Promise.resolve();
    const active = new Set();
    const fst = t => t[0];
    const snd = t => t[1];
    return function(x) {
        function put() {
            const p = fn(x);
            const a = p.reflect().then(() => {
                active.delete(a);
            });
            active.add(a);
            return [Promise.race(active), p];
        }
        if (active.size < n) {
            const r = put()
            q = fst(t);
            return snd(t);
        } else {
            const r = q.then(put);
            q = r.then(fst);
            return r.then(snd)
        }
    };
}

Btw, you might want to have a look at the actors model and CSP. They can simplify dealing with such things, there are a few JS libraries for them out there as well.

Example

import Promise from 'bluebird'

function sequential(fn) {
  var q = Promise.resolve();
  return (...args) => {
    const p = q.then(() => fn(...args))
    q = p.reflect()
    return p
  }
}

async function _delayPromise (seconds, str) {
  console.log(`${str} started`)
  await Promise.delay(seconds)
  console.log(`${str} ended`)
  return str
}

let delayPromise = sequential(_delayPromise)

async function a() {
  await delayPromise(100, "a:a")
  await delayPromise(200, "a:b")
  await delayPromise(300, "a:c")
}

async function b() {
  await delayPromise(400, "b:a")
  await delayPromise(500, "b:b")
  await delayPromise(600, "b:c")
}

a().then(() => console.log('done'))
b().then(() => console.log('done'))

// --> with sequential()

// $ babel-node test/t.js
// a:a started
// a:a ended
// b:a started
// b:a ended
// a:b started
// a:b ended
// b:b started
// b:b ended
// a:c started
// a:c ended
// b:c started
// done
// b:c ended
// done

// --> without calling sequential()

// $ babel-node test/t.js
// a:a started
// b:a started
// a:a ended
// a:b started
// a:b ended
// a:c started
// b:a ended
// b:b started
// a:c ended
// done
// b:b ended
// b:c started
// b:c ended
// done
Bergi
  • 630,263
  • 148
  • 957
  • 1,375
0

Use the throttled-promise module:

https://www.npmjs.com/package/throttled-promise

var ThrottledPromise = require('throttled-promise'),
    promises = [
        new ThrottledPromise(function(resolve, reject) { ... }),
        new ThrottledPromise(function(resolve, reject) { ... }),
        new ThrottledPromise(function(resolve, reject) { ... })
    ];

// Run promises, but only 2 parallel
ThrottledPromise.all(promises, 2)
.then( ... )
.catch( ... );
hunterloftis
  • 13,386
  • 5
  • 48
  • 50
  • 1
    I don't see how the OP's problem, which creates promises in different scopes, could be solved with this library. Can you please implement the `a` and `b` functions or `limitConcurrency`? – Bergi Aug 05 '16 at 02:16
  • 1
    This is a popular library addressing, quite literally, op's question: "I'm looking for a promise function wrapper that can limit / throttle when a given promise is running so that only a set number of that promise is running at a given time." If you're trying to throttle promises, you should use throttled-promise. SO is for answering questions, not writing code on demand. – hunterloftis Aug 08 '16 at 18:25
  • 1
    He is looking for a wrapper of promise-returning functions, not a wrapped `Promise` constructor. Also that library seems to require you to use `ThrottledPromise.all`, which does not fit the problem as stated in the question. Please try to implement the `a`/`b` example and you'll probably see how it does not work - or if it works, that code actually answers the question. Notice that SO is not a library recommendation service, but requires actual answers to the questions, which does involve code writing. – Bergi Aug 08 '16 at 18:37
0

I have the same problem. I wrote a library to implement it. Code is here. I created a queue to save all the promises. When you push some promises to the queue, the first several promises at the head of the queue would be popped and running. Once one promise is done, the next promise in the queue would also be popped and running. Again and again, until the queue has no Task. You can check the code for details. Hope this library would help you.

Jiaju
  • 1
  • 1
0

Advantages

  • you can define the amount of concurrent promises (near simultaneous requests)
  • consistent flow: once one promise resolve, another request start no need to guess the server capability
  • robust against data choke, if the server stop for a moment, it will just wait, and next tasks will not start just because the clock allowed
  • do not rely on a 3rd party module it is Vanila node.js

1st thing is to make https a promise, so we can use wait to retrieve data (removed from the example) 2nd create a promise scheduler that submit another request as any promise get resolved. 3rd make the calls

Limiting requests taking by limiting the amount of concurrent promises

const https = require('https')

function httpRequest(method, path, body = null) {
  const reqOpt = { 
    method: method,
    path: path,
    hostname: 'dbase.ez-mn.net', 
    headers: {
      "Content-Type": "application/json",
      "Cache-Control": "no-cache"
    }
  }
  if (method == 'GET') reqOpt.path = path + '&max=20000'
  if (body) reqOpt.headers['Content-Length'] = Buffer.byteLength(body);
  return new Promise((resolve, reject) => {
  const clientRequest = https.request(reqOpt, incomingMessage => {
      let response = {
          statusCode: incomingMessage.statusCode,
          headers: incomingMessage.headers,
          body: []
      };
      let chunks = ""
      incomingMessage.on('data', chunk => { chunks += chunk; });
      incomingMessage.on('end', () => {
          if (chunks) {
              try {
                  response.body = JSON.parse(chunks);
              } catch (error) {
                  reject(error)
              }
          }
          console.log(response)
          resolve(response);
      });
  });
  clientRequest.on('error', error => { reject(error); });
  if (body) { clientRequest.write(body)  }  
  clientRequest.end();

  });
}

    const asyncLimit = (fn, n) => {
      const pendingPromises = new Set();

  return async function(...args) {
    while (pendingPromises.size >= n) {
      await Promise.race(pendingPromises);
    }

    const p = fn.apply(this, args);
    const r = p.catch(() => {});
    pendingPromises.add(r);
    await r;
    pendingPromises.delete(r);
    return p;
  };
};
// httpRequest is the function that we want to rate the amount of requests
// in this case, we set 8 requests running while not blocking other tasks (concurrency)


let ratedhttpRequest = asyncLimit(httpRequest, 8);

// this is our datase and caller    
let process = async () => {
  patchData=[
      {path: '/rest/slots/80973975078587', body:{score:3}},
      {path: '/rest/slots/809739750DFA95', body:{score:5}},
      {path: '/rest/slots/AE0973750DFA96', body:{score:5}}]

  for (let i = 0; i < patchData.length; i++) {
    ratedhttpRequest('PATCH', patchData[i].path,  patchData[i].body)
  }
  console.log('completed')
}

process() 
Thiago Conrado
  • 726
  • 8
  • 15
-2

The classic way of running async processes in series is to use async.js and use async.series(). If you prefer promise based code then there is a promise version of async.js: async-q

With async-q you can once again use series:

async.series([
    function(){return delayPromise(100, "a:a")},
    function(){return delayPromise(100, "a:b")},
    function(){return delayPromise(100, "a:c")}
])
.then(function(){
    console.log(done);
});

Running two of them at the same time will run a and b concurrently but within each they will be sequential:

// these two will run concurrently but each will run
// their array of functions sequentially:
async.series(a_array).then(()=>console.log('a done'));
async.series(b_array).then(()=>console.log('b done'));

If you want to run b after a then put it in the .then():

async.series(a_array)
.then(()=>{
    console.log('a done');
    return async.series(b_array);
})
.then(()=>{
    console.log('b done');
});

If instead of running each sequentially you want to limit each to run a set number of processes concurrently then you can use parallelLimit():

// Run two promises at a time:
async.parallelLimit(a_array,2)
.then(()=>console.log('done'));

Read up the async-q docs: https://github.com/dbushong/async-q/blob/master/READJSME.md

slebetman
  • 109,858
  • 19
  • 140
  • 171
  • OP does not want to know how to implement `a`/`b` without the async keyword, but how to implement `limitConcurrency` so that in the example the `delayPromise` calls from `a` and `b` are not parallel. – Bergi Aug 05 '16 at 21:31
  • @Bergi: That's what I showed. Using `async.series()` will run `delayPromise` in sequence. – slebetman Aug 05 '16 at 23:18
  • 1
    No. They should be sequential even when you call `a` and `b` concurrently. – Bergi Aug 06 '16 at 15:15