1

I have multiple Bull Queues in my NodeJS project which will run if previous queue is executed successfully. I'm trying to verify some email addresses here.

  1. Check the Email format (formatQueue)

  2. Email Existence using npm email-existence package (existenceQueue)

The formatQueue is less time taking process, which wil run the RegEx and validate the Email format. but The email-existence package takes around 5-10 seconds to complete.

formatQueue and existenceQueue works properly if there are less jobs like 20-100. but when I Add more than that around 1000 jobs at a time, existenceQueue failes with below error

myemail@email.com job stalled more than allowable limit

I checked the issue HERE and HERE, I thought the process is taking too long to respond, so added limiter as refered HERE. But that does not help me.

If a job in any of the queue fails, Its not processing the next job. It will stop there and the other jobs will stay in waiting state.

My code is something similar to below code. please help me with the issue.

Queue.js

var formatQueue = new Queue('format', "redis-db-url");
var existenceQueue = new Queue('existence', "redis-db-url");

// ------------ function for adding to queue ------------
module.exports.addToQueue = (emails) => {
    emails.forEach(element => {
        formatQueue.add(element, { attempts: 3, backoff: 1000 });
    });
}

// ------------ Queue Process -------------

// Format Test Process
formatQueue.process(function(job, done){
    FormatTest.validate(job.data, (err, data) => {
        if(err) done();
        else{
            job.data = data;
            done();
        }
    });
});

// Existence Test Process
formatQueue.process(function(job, done){
    ExistenceTest.validate(job.data, (err, data) => {
        if(err) done();
        else{
            job.data = data;
            done();
        }
    });
});


// ------------ On Cmplete Handlers ------------
formatQueue.on('completed', function(job){
    if(job.data.is_well_format){
        existenceQueue.add(job.data, { attempts: 3, backoff: 1000 });
    }else QueueModel.lastStep(job.data)
});

existenceQueue.on('completed', function(job){
    QueueModel.lastStep(job.data)
});


// ------------ To update the emaile ------------
module.exports.lastStep = (data) => {
    Emails.updateEmail(data, (err, updated) => {
        if(!err) {
            formatQueue.clean('completed');
            existenceQueue.clean('completed');
        }
    })
}

--------- Update ---------

The processor was taking too much time to respond so the job was getting stalled or getting failed since i was using timeout.

I'm trying to run the process in different processor file itsef as its in bull documentation, I've added the file as below.

// -------- Queue.js ----------

formatQueue.process(__dirname+"/processors/format-worker.js");


// On Cmplete Handler

formatQueue.on('completed', function(job, result){
    console.log(result, "Format-Complete-job"); // result is undefined
    if(job.data.is_well_format){
        existenceQueue.add(job.data, { attempts: 3, backoff: 1000 });
    }else QueueModel.lastStep(job.data)
});

// -------- Queue.js ends ---------

//format-worker.js
Validator = require("../../validators");
module.exports = (job) => {
    Validator.Format.validate(job.data, (data) => {
        job.data = data;
        return Promise.resolve(data);
    });
}

Now On Job complete which i was using before, I used to get job data with updated job parameters. Now I'm not getting updated job data. and the second parameter which is there in the documentation i.e result is undefined. Now how can I get the updated job data in this case.

rakcode
  • 2,256
  • 4
  • 19
  • 44
  • This queue behaviour is exactly what i want to achieve in my question here: https://stackoverflow.com/questions/61659677/bull-queue-when-a-job-fails-how-to-stop-queue-from-processing-remaining-jobs – Dheemanth Bhat May 07 '20 at 14:09
  • Hello mate, in your case You want to stop the queue, But in my case I want t update the job data and continue processing other data. – rakcode May 08 '20 at 14:24

1 Answers1

2

Try repeatable jobs

var formatQueue = new Queue('format', "redis-db-url");
var existenceQueue = new Queue('existence', "redis-db-url");

// ------------ function for adding to queue ------------
module.exports.addToQueue = (emails) => {
  emails.forEach(element => {
    let jobOptions = {
      repeat: {
        every: 10 * 1000, // Run job every 10 seconds for example
        limit: 3 // Maximum number of times a job can repeat.
      },
      jobId: someUniqueId, // important do not forget this
      removeOnComplete: true, // removes job from queue on success (if required)
      removeOnFail: true // removes job from queue on failure (if required)
    }

    formatQueue.add(element, jobOptions);
  });
}

// ------------ Queue Process -------------

// Format Test Process
formatQueue.process(function (job, done) {
  FormatTest.validate(job.data, (err, data) => {
    if (err) {
      // Done with error
      done(true);
    } else {
      job.data = data;
      // Done without any error
      done(false);
    }
  });
});

// Existence Test Process
existenceQueue.process(function (job, done) {
  ExistenceTest.validate(job.data, (err, data) => {
    if (err) {
      // Done with error
      done(true);
    } else {
      job.data = data;
      // Done without any error
      done(false);
    }
  });
});


// ------------ On Complete Handlers ------------
formatQueue.on('completed', function (job) {
  if (job.data.is_well_format) {
    let jobOptions = {
      repeat: {
        every: 10 * 1000, // Run job every 10 seconds for example
        limit: 3 // Maximum number of times a job can repeat.
      },
      jobId: someUniqueId, // important do not forget this
      removeOnComplete: true, // removes job from queue on success (if required)
      removeOnFail: true // removes job from queue on failure (if required)
    }

    existenceQueue.add(job.data, jobOptions);
  } else QueueModel.lastStep(job.data)
});

existenceQueue.on('completed', function (job) {
  QueueModel.lastStep(job.data)
});


// ------------ To update the email ------------
module.exports.lastStep = (data) => {
  Emails.updateEmail(data, (err, updated) => {
    if (!err) {
      formatQueue.clean('completed');
      existenceQueue.clean('completed');
    }
  })
}

Dheemanth Bhat
  • 4,269
  • 2
  • 21
  • 40
  • Hello, I think this epeats the job right? How it is relevent to that error? can you please explain! so that I can have a better understanding in the future! – rakcode May 08 '20 at 14:23
  • In one answer on GitHub, Someone mentioned that Its the issue with Mongoose, By increasing poolSize value, You can get rid of this error. in my case there will be more than 1000 Queries will be running, Every time the lastStep function is called, 2 things will happen. 11 It'll update the email data and It will check for all emails in list is completed or not, if yes, it'll update the liststatus. does this afect on the process of queues? – rakcode May 08 '20 at 14:29
  • 1
    Yes above code repeats the job but by using removeRepeatable you can stop repetition on first success also in combination with removeOnComplete and removeOnFail you can make it more robust. I am struggling in digesting the bull as well even though there is extensive documentation. Discard the answer if it dint help. – Dheemanth Bhat May 08 '20 at 16:32
  • 1
    Have a look at this repo: https://github.com/DheemanthBhat/bull-lib – Dheemanth Bhat May 08 '20 at 16:34
  • Sure thing. I'll try with that and Get back to you! – rakcode May 08 '20 at 19:56
  • The issue was with processors. the processor was taking too much time to respond. I'm trying to use seperate processor file for different workers (so that it will use seperzte thread and better `bull` utilization as i read on their docs). The Processor Runs properly, but the Result is `undefined`. I've updated the question. please have a look. – rakcode May 13 '20 at 09:13
  • Sure will. In past few days i realized that Bull is a very powerful tool but we have to use it "properly". – Dheemanth Bhat May 13 '20 at 09:39
  • Yes. I'm also working on understanding the package. Can we collaborate and learn and exchange the knowledge? Where can I ping you? – rakcode May 13 '20 at 09:57