0

I'm fairly new with node, and I've been learning and dealing with async/promises. Right now I'm trying to create a process which makes an insert from a DB (10K rows for example), calls a webservice that transforms one column and then make an insert for the modified data.

So, I make an Oracle SQL query, and for the result, I do a foreach:

let counter = 0;
var binds = [];

res.rows.forEach((row) => {

    var original_data = row[0];

    Transform(original_data).then(new_data => {

        counter++;

        binds.push([original_data,new_data]);

        if (counter % 1000 === 0){
            console.log(`1K rows`);
            doInserts(binds);
            binds = [];
        }

    });

});

I'm calling the doInserts every 1000 rows, so I don't have many transactions open on Oracle.

The Transform function calls a webservice, which resolves with the value that I need.

function Transform(value){

    return new Promise(function(resolve, reject){
        var requestPath = `http://localhost:3000/transform/${value}`;
        var req = request.get(requestPath, function(err, response, body){
            if (!err && response.statusCode == 200){
                resolve(body);
            }else{
                reject("API didn't respond.");
            }
        }).end();
    });

}

However, this is clogging the webservice (I use the request library to connect) when the foreach has 10K rows. I am thinking, the foreach is not doing the Transform one at a time synchronously.

This is probably me not knowing lots of node, async, promises.. but I'm puzzled. Can anyone help?

Fede E.
  • 2,118
  • 4
  • 23
  • 39
  • https://blog.slaks.net/2015-06-10/advanced-promise-usage/ – SLaks Jun 01 '18 at 16:43
  • possible duplicate of https://stackoverflow.com/questions/14220321/how-do-i-return-the-response-from-an-asynchronous-call – J. Pichardo Jun 01 '18 at 16:46
  • Wow, where to begin? :) 1) Where does `res.rows` come from? 2) Your `forEach` off `res.rows` is a sync loop doing async work, generally not a good idea. 3) Seems like your transform is on localhost. Can you just call a function to do the work? 4) The logic to get rows and the logic to do inserts looks risky. What happens if it fails in the middle? Why not just fetch 1000 rows at a time if that's the batch size you want - then you just need to ensure you have a way to query the next "unprocessed" set. 5) How is `doInserts` implemented? – Dan McGhan Jun 01 '18 at 19:03
  • If you're inserting lots of rows, make sure to use executeMany() and don't do single-row inserts. And use binds. – Christopher Jones Jun 04 '18 at 00:30

3 Answers3

3

You are doing to much request at the same time. Try set a concurrence. You can use bluebird's Promise.map: http://bluebirdjs.com/docs/api/promise.map.html

await Promise.map(rows, async (row) => {
    const new_data = await Transform(row[0])
    ...
}, {concurrency: 3})  // allow max 3 request at the same time
Xuezheng Ma
  • 377
  • 3
  • 9
0

You can use any promise library or ES6 Promise to gather an array of promises and resolve them all together.

In this example, I will use bluebird

const Promise = require('bluebird');

async function resolvingManyPromises() {
    let counter = 0;
    let binds = [];
    let promiseArray = [];

    res.rows.forEach(row => {
        var original_data = row[0];

        promiseArray.push(Transform(original_data));
    });

    const resolvedPromises = await Promise.all(promiseArray);

    // Do something with the resolved values resolvedPromises
}

Note that Promise.all will attempt to resolve all promises in the array at once in parallel. If your DB has connection count limit, some calls may fail.

Jason Kim
  • 18,102
  • 13
  • 66
  • 105
  • Thanks Jason, but I think that is what is happening. The request in Transform() is making too many connections, causing the API to stop responding. – Fede E. Jun 01 '18 at 17:37
0

The selected answer rejects if one fails and all successes are lost (if the last rejects you've lost all but the last result).

This is code adapted to your situation a little more information about the code can be found here it does not use bluebird but uses lib.throttle (from lib that contains generally useful functions I wrote)

//lib comes from: https://github.com/amsterdamharu/lib/blob/master/src/index.js
const lib = require("lib");

const Fail = function(reason){this.reason=reason;};
const isFail = o=>(o&&o.constructor)===Fail;
const isNotFail = o=>!isFail(o);
const handleBatch = results =>{//this will handle results of a batch
  //failed are the requests that failed
  //you may want to save the ones that failed to file or something
  const failed = results.filter(isFail);
  const successes = results.filter(result=>!isFail(result));
  return doInserts(successes);
};
const processor = throttler => row =>
  throttler(//throttling Transform to max 10 active
    row=>
      Transform(row[0])
      .then(new_data =>[row[0],new_data])
    )(row)
    .catch(err=>new Fail([err,row]))//catch reject and resolve with fail object
;
//start the process
lib.batchProcess (handleBatch) (1000) (processor(lib.throttle(10))) ([]) (res.rows)
.then(
  results=>console.log("Process done")
  ,err=>console.error("This should not happen:".err)
);
HMR
  • 37,593
  • 24
  • 91
  • 160