-1

I am trying to read some large CSV files and processing those data, so there is a rate limit in processing, so I want to add 1mnt delay between each request. I tried with the set timeout, but finally, come to know there is a limit for settimeout and get the following error. I am not sure any other way to handle the situation, the CSV file has more than 1M records. Am I doing anything wrong here?

Error

Timeout duration was set to 1. (node:41) TimeoutOverflowWarning: 2241362000 does not fit into a 32-bit signed integer.

Samle code:

   const Queue = require('bull');
const domainQueue = new Queue(config.api.crawlerQ, {
  redis: connectRedis(),
});
let ctr = 0;
function processCSV (name, fileName, options)  {
  return new Promise((resolve, reject) => {
    console.log('process csv started', new Date());
    let filePath = config.api.basePath + fileName;
    stream = fs.createReadStream(filePath)
        .on('error', (error) => {
          // handle error
          console.log('error processing csv');
          reject(error);
        })
        .pipe(csv())
        .on('data', async (row) => {
          ctr++
          increment(row, ctr)
        })
        .on('end', () => {
          console.log('stream processCSV end', fileName, new Date());
          resolve(filePath);
        })
  });

}

async function increment(raw, counter) {
  setTimeout(async function(){
    console.log('say i am inside a function', counter, new Date());
    domainQueue.add(data, options); // Add jobs to queue - Here i Need a delay say 1mnt, if i
    // add jobs without delay it will hit ratelimit 
  }, 60000 * counter);

}

function queueWorkerProcess(value) { // Process jobs in queue and save in text file 
  console.log('value', value, new Date());
  return new Promise(resolve => {
    resolve();
  });

}
Dibish
  • 9,133
  • 22
  • 64
  • 106
  • I can't really tell what you're trying to do here. You're calling `increment()` on every single row of every single file. Apparently, `60000 * counter` gets to be too large a number. The timer doesn't seem to actually do anything useful so I have no idea what you're trying to accomplish with it. – jfriend00 May 17 '20 at 02:56
  • So, this code process one local CSV file. What is the problem you're trying to solve with it? Where do you have a rate limit problem? I don't see it here in this code. We need to see the actual real code that has a problem in it and we need a complete description of the actual problem. – jfriend00 May 17 '20 at 02:58
  • yes, excel contains domain names, and I process every row of that file and make an external api call, so I want a delay between each request, so I used to counter. see: this https://borgs.cybrilla.com/tils/javascript-for-loop-with-delay-in-each-iteration-using-iife/ Issue see this: https://stackoverflow.com/questions/3468607/why-does-settimeout-break-for-large-millisecond-delay-values/3468650#3468650 – Dibish May 17 '20 at 05:39
  • I could probably help you in about 5 minutes if you showed your ACTUAL code and what you're actually trying to do and what the actual problem is. Literally, I could solve your problem in only a few minutes if you showed the WHOLE problem. But, I refuse to write answers that try to guess what you're really trying to do. So far, the only problem your code you show actually has is that you're trying to create too big an integer. Remove that `setTimeout()` that has no real purpose here and there's no problem with this code. Show us your REAL problem and the REAL code. – jfriend00 May 17 '20 at 05:41
  • I edited my question, I am using a bull queue job, and I want a delay when I add the job to the queue – Dibish May 17 '20 at 05:52
  • Why do you want to delay adding it to the queue? What problem are you trying to solve? What is the actual rate limit with the queue? It's best to code to actually meet the rate limit rather than just insert random delays. – jfriend00 May 17 '20 at 06:06
  • If I add huge number of jobs(10 million) to queue without delay, server stopping with JS Heap out of memory error. – Dibish May 17 '20 at 06:07
  • That's not a rate limit, that's just too many items in the queue or too many queued operations underway such that you're using too much memory. Anyway, I'll code up a suggestion now that you've finally explained the actual problem. – jfriend00 May 17 '20 at 06:14
  • Looks like I need to know one more thing. How do you know when items that you put in the queue are done being processed? I need to know how many items are in the queue and need a notification when an item in the queue finishes. – jfriend00 May 17 '20 at 06:17
  • Do you really need to use this queue or are you just trying to write something to a text file. What are you actually doing with these rows? – jfriend00 May 17 '20 at 06:21
  • queue provides a listener like follows queue.on('completed', async (job, result) => { }) see: https://github.com/OptimalBits/bull – Dibish May 17 '20 at 06:21
  • And, is that `completed` event fired for every item that was inserted with `.add()`? – jfriend00 May 17 '20 at 06:22
  • Its big background job which will take weeks to finish, so I must use queue jobs. I just want to give some delay after I add a job – Dibish May 17 '20 at 06:22
  • Yes, the completed event will fire for every item added – Dibish May 17 '20 at 06:23
  • A plain delay won't work. You have to pause the stream do keep data from piling in. See my answer below. – jfriend00 May 17 '20 at 06:36
  • Thanks i will try this way and update you..Thanks for your help – Dibish May 17 '20 at 06:37

1 Answers1

1

Here's a general idea. You need to keep track of how many items are inflight being processed to both limit the amount of memory used and to control the load on whatever resources you're storing the results in.

When you hit some limit of how many are inflight, you pause the stream. When you get back below the limit, you resume the stream. You increment a counter on .add() and decrement a counter on the completed message to keep track of things. That's where you pause or resume the stream.

FYI, just inserting a setTimeout() somewhere won't help you. To get your memory usage under control, you have to pause the flow of data from the stream once you have too many items in process. Then, when the items get back under a threshold, you can then resume the stream.

Here's an outline of what that could look like:

const Queue = require('bull');
const domainQueue = new Queue(config.api.crawlerQ, {
    redis: connectRedis(),
});

// counter that keeps track of how many items in the queue
let queueCntr = 0;

// you tune this constant up or down to manage memory usage or tweak performance
// this is what keeps you from having too many requests going at once
const queueMax = 20;

function processCSV(name, fileName, options) {
    return new Promise((resolve, reject) => {
        let paused = false;

        console.log('process csv started', new Date());
        const filePath = config.api.basePath + fileName;

        const stream = fs.createReadStream(filePath)
            .on('error', (error) => {
                // handle error
                console.log('error processing csv');
                domainQueue.off('completed', completed);
                reject(error);
            }).pipe(csv())
            .on('data', async (row) => {
                increment(row, ctr);
                if (queueCntr)
            })
            .on('end', () => {
                console.log('stream processCSV end', fileName, new Date());
                domainQueue.off('completed', completed);
                resolve(filePath);
            });

        function completed() {
            --queueCntr;
            // see if queue got small enough we now resume the stream
            if (paused && queueCntr < queueMax) {
                stream.resume();
                paused = false;
            }
        }

        domainQueue.on('completed', completed);

        function increment(raw, counter) {
            ++queueCntr;
            domainQueue.add(data, options);
            if (!paused && queueCntr > queueMax) {
                stream.pause();
                paused = true;
            }
        }
    });
}

And, if you're calling processCSV() multiple times with different files, you should sequence them so you don't call the 2nd one until the first one is done, don't call the 3rd one until the 2nd one is done and so on... You don't show that code so we can't make a specific suggestion on that.

jfriend00
  • 683,504
  • 96
  • 985
  • 979
  • Thank you @jfriend00, your comment really helped me. I took the idea of pause/resume to control the issue. I was trying the delay with a number of ways but all the time I stuck with it. Thank you for suggesting the stream pause. – Dibish May 17 '20 at 09:48
  • Hi, it worked for me but I have an issue now, as I am processing several CSV files asynchronously using the single stream above, after pause/resume some streams are not ending ie those are not giving 'end' event, seems like those are struck or lost in the middle. Do I need to use dynamic streams to handle each csv? – Dibish May 25 '20 at 04:37
  • async function readDirectory() { try { fs.readdir(config.api.basePath + '/csvdir/', (err, files) => { if (err) throw err; for (const file of files) { console.log('file', file); processCSV(file) } }); } catch (error) { } } readDirectory(); – Dibish May 25 '20 at 04:37