13

How do I write a method that limits Q promise concurrency?

For instance, I have a method spawnProcess. It returns a Q promise.
I want no more than 5 process spawned at a time, but transparently to the calling code.

What I need to implement is a function with signature

function limitConcurrency(promiseFactory, limit)

that I can call like

spawnProcess = limitConcurrency(spawnProcess, 5);

// use spawnProcess as usual

I already started working on my version, but I wonder if anyone has a concise implementation that I can check against.

Dan Abramov
  • 264,556
  • 84
  • 409
  • 511
  • Are you writing code for the browser, or for Node? If it's the former, there is no concurrency... – Matt Ball Dec 12 '13 at 14:40
  • @Matt: I'm writing for the node. I don't mean concurrency as in threading, I mean concurrency as in “promises running at the same time”. – Dan Abramov Dec 12 '13 at 14:41
  • What did you try? Using a waiting queue and buffering requests shouldn't be too hard. – schlingel Dec 12 '13 at 14:43
  • @schlingel: I'm not saying it's hard. As soon as I'm ready, I'll post the complete code. I'm just trying to figure out how to properly chain promises so the next one starts *as soon as* the previous one has finished. – Dan Abramov Dec 12 '13 at 14:46
  • In case the request can be executed instantly you can return the deferred which is resolved by the process, in case you have to queue it you will have to use two seperate deferreds. One which calls a function which decreases the counter and additionally resolves the second deferred. The second deferred should be returned by the spawnProcess function. – schlingel Dec 12 '13 at 14:50
  • @schlingel: Thank you for the tips. Q promises always resolve asynchronously. Could you please review my answer? – Dan Abramov Dec 12 '13 at 15:37

4 Answers4

9

I have a library that does this for you https://github.com/ForbesLindesay/throat

You can use it via browserify or download the standalone build from brcdn (https://www.brcdn.org/?module=throat&version=latest) and add it as a script tag.

Then (assuming the Promise constructor is polyfilled or implemented in your environment) you can do:

//remove this line if using standalone build
var throat = require('throat');

function limitConcurrency(promiseFactory, limit) {
  var fn = throat(promiseFactory, limit);
  return function () {
    return Q(fn.apply(this, arguments));
  }
}

You could just call throat(promiseFactory, limit) directly but that would return a promise promise rather than a Q promise.

I also really like using it with array.map.

// only allow 3 parallel downloads
var downloadedItems = Q.all(items.map(throat(download, 3)));
ForbesLindesay
  • 10,482
  • 3
  • 47
  • 74
2

This seems to be working for me.

I'm not sure if I could simplify it. The recursion in scheduleNextJob is necessary so the running < limit and limit++ always execute in the same tick.

Also available as a gist.

'use strict';

var Q = require('q');

/**
 * Constructs a function that proxies to promiseFactory
 * limiting the count of promises that can run simultaneously.
 * @param promiseFactory function that returns promises.
 * @param limit how many promises are allowed to be running at the same time.
 * @returns function that returns a promise that eventually proxies to promiseFactory.
 */
function limitConcurrency(promiseFactory, limit) {
  var running = 0,
      semaphore;

  function scheduleNextJob() {
    if (running < limit) {
      running++;
      return Q();
    }

    if (!semaphore) {
      semaphore = Q.defer();
    }

    return semaphore.promise
      .finally(scheduleNextJob);
  }

  function processScheduledJobs() {
    running--;

    if (semaphore && running < limit) {
      semaphore.resolve();
      semaphore = null;
    }
  }

  return function () {
    var args = arguments;

    function runJob() {
      return promiseFactory.apply(this, args);
    }

    return scheduleNextJob()
      .then(runJob)
      .finally(processScheduledJobs);
  };
}

module.exports = {
  limitConcurrency: limitConcurrency
}
Dan Abramov
  • 264,556
  • 84
  • 409
  • 511
2

The Deferred promise implementation has gate function which works exactly that way:

spawnProcess = deferred.gate(spawnProcess, 5);    
Mariusz Nowak
  • 32,050
  • 5
  • 35
  • 37
2

I wrote a little library to do this: https://github.com/suprememoocow/qlimit

It's extremely easy to use and is specifically designed to work with Q promises:

var qlimit = require('qlimit');
var limit = qlimit(2); // 2 being the maximum concurrency

// Using the same example as above
return Q.all(items.map(limit(function(item, index, collection) { 
  return performOperationOnItem(item);
}));

It can also be used to limit concurrency to a specific resource, like this:

var qlimit = require('qlimit');
var limit = qlimit(2); // 2 being the maximum concurrency

var fetchSomethingFromEasilyOverwhelmedBackendServer = limit(function(id) {
  // Emulating the backend service
  return Q.delay(1000)
    .thenResolve({ hello: 'world' }); 
});
Andrew Newdigate
  • 6,005
  • 3
  • 37
  • 31