49

I've got a Node.js app that gets a list of file locally and uploads them to a server. This list could contain thousands of files.

for (var i = 0; i < files.length; i++) {
   upload_file(files[i]);
}

If I execute this with thousands of files, upload_file will get called thousands of times all at once, and most likely die (or at least struggle). In the synchronous world, we'd create a thread pool and limit it to a certain number of threads. Is there a simple way to limit how many asynchronous calls get executed at once?

Jon Nichols
  • 2,211
  • 1
  • 20
  • 21

5 Answers5

76

As usual, I recommend Caolan McMahon's async module.

Make your upload_file function take a callback as it's second parameter:

var async = require("async");

function upload_file(file, callback) {
    // Do funky stuff with file
    callback();
}

var queue = async.queue(upload_file, 10); // Run ten simultaneous uploads

queue.drain = function() {
    console.log("All files are uploaded");
};

// Queue your files for upload
queue.push(files);

queue.concurrency = 20; // Increase to twenty simultaneous uploads
Linus Thiel
  • 38,647
  • 9
  • 109
  • 104
25

The answer above, re: async on NPM is the best answer, but if you'd like to learn more about control flow:


You should look into control flow patterns. There's a wonderful discussion on control flow patterns in Chapter 7 of Mixu's Node Book. Namely, I'd look at the example in 7.2.3: Limited parallel - an asynchronous, parallel, concurrency limited for loop.

I've adapted his example:

function doUpload() {
    // perform file read & upload here...
}

var files   = [...];
var limit   = 10;       // concurrent read / upload limit
var running = 0;        // number of running async file operations

function uploader() {
    while(running < limit && files.length > 0) {
        var file = files.shift();
        doUpload(file, function() {
            running--;
            if(files.length > 0)
                uploader();
        });
        running++;
    }
}

uploader();
Wes Johnson
  • 3,063
  • 23
  • 32
  • It works fine and runs only the specified number of async operations; however I noticed that the value of `file` inside `doUpload` (e.g. before `running--;`) doesn't contain the expected value, for example `console.log(file)` will print the same file 10 times on the first 10 lines (if `limit` has a value of 10, that is) – golimar Apr 18 '16 at 15:58
8

You should try queueing. I assume that a callback is fired when upload_file() finishes. Something like this should do the trick (untested):

function upload_files(files, maxSimultaneousUploads, callback) {
    var runningUploads = 0,
        startedUploads = 0,
        finishedUploads = 0;

    function next() {
        runningUploads--;
        finishedUploads++;

        if (finishedUploads == files.length) {
            callback();
        } else {
            // Make sure that we are running at the maximum capacity.
            queue();
        }
    }

    function queue() {
        // Run as many uploads as possible while not exceeding the given limit.
        while (startedUploads < files.length && runningUploads < maxSimultaneousUploads) {
            runningUploads++;
            upload_file(files[startedUploads++], next);
        }
    }

    // Start the upload!
    queue();
}
jwueller
  • 30,582
  • 4
  • 66
  • 70
3

The others answers seem to be outdated. This can be solved easily using paralleLimit from async. Below is how to use it. I haven't tested it.

var tasks = files.map(function(f) {
    return function(callback) {
        upload_file(f, callback)
    }
});

parallelLimit(tasks, 10, function(){
});
Arun Ghosh
  • 7,634
  • 1
  • 26
  • 38
0

No external libraries. Just plain JS.

It can be resolved using recursion.

The idea is that initially we immediately start the maximum allowed number of uploads and each of these requests should recursively initiate a new upload on its completion.

In this example I populate successful responses together with errors and I execute all requests but it's possible to slightly modify algorithm if you want to terminate batch upload on the first failure.

async function batchUpload(files, limit) {
  limit = Math.min(files.length, limit);

  return new Promise((resolve, reject) => {
    const responsesOrErrors = new Array(files.length);
    let startedCount = 0;
    let finishedCount = 0;
    let hasErrors = false;

    function recursiveUpload() {
      let index = startedCount++;

      uploadFile(files[index])
        .then(res => {
          responsesOrErrors[index] = res;
        })
        .catch(error => {
          responsesOrErrors[index] = error;
          hasErrors = true;
        })
        .finally(() => {
          finishedCount++;
          if (finishedCount === files.length) {
            hasErrors ? reject(responsesOrErrors) : resolve(responsesOrErrors);
          } else if (startedCount < files.length) {
            recursiveUpload();
          }
        });
    }

    for (let i = 0; i < limit; i++) {
      recursiveUpload();
    }
  });
}

async function uploadFile(file) {
  console.log(`${file} started`);
  const delay = Math.floor(Math.random() * 1500);
  return new Promise((resolve, reject) => {
    setTimeout(() => {
      if (delay <= 1000) {
        console.log(`${file} finished successfully`);
        resolve(`${file} success`);
      } else {
        console.log(`${file} finished with error`);
        reject(`${file} error`);
      }
    }, delay);
  });
}

const files = new Array(10).fill('file').map((file, index) => `${file}_${index + 1}`);

batchUpload(files, 3)
  .then(responses => console.log('All successfull', responses))
  .catch(responsesWithErrors => console.log('All with several failed', responsesWithErrors));
Anton Fil
  • 223
  • 2
  • 8