4

I am trying to create a results queue on BullMQ where all workers can send results adding jobs with special crafted jobIDs. The idea is that all results arrive with a specific jobID generated so that I known exactly which process the result is for.

I've tried with getNextJob as described in the documentation, with no luck.

The solution I found is using the queuEvents: each process registers a listener on the waiting state of the results queue and, when the job with the id It needs arrives, the process gets the job with a getJob, reads the result data and tries to move the job to completed. It works and I can correctly grab the results produced by workers.

The problem I have is moving result jobs to the completed state, since I cannot configure a lock token with the getJob and I receive a Missing lock for job error and the job remains in the active state.

This is the (pseudo-)code I use on the process


const jobID = "THE_ID_OF_THE_JOB_I_AM_WAITING_FOR";
const token = `${jobID}_results_worker`;
const queueEvents = new QueueEvents('results');
const resQueue = this.queues.get('results');
// I define a callback function to be able to remove the listener
const waitResult = async (job: {jobId: any}) => {
  if (job.jobId === jobID){
    debug(`Result job for ${jobID} received!`);
    const resJob = await resQueue?.getJob(jobID) as Job;
    queueEvents.removeListener('waiting', waitResult);
    // THIS GENERATES the error 
    resJob?.moveToCompleted('Results received', token, false);
    resolve(resJob?.data);
  }
} 
// Register the callback function on the queue
const listener = queueEvents?.on('waiting', waitResult );

Does anybody have any idea about how to correctly handle the moveToCompleted?

  • I partially solved the problem removing the result job from the queue with the command ``resQueue?.remove(jobID)``. This is sub-optimal since i lose the history of the results – Fabrizio Invernizzi Sep 21 '21 at 12:58

1 Answers1

0

You could develop a results queue, like so: const queue_Results = new Queue('Results');, from there you could have a worker handle the event like so const worker_Results = new Worker('Results', async (job: Job) => { // do something with the results from other jobs })

BullMQ documentation about that method, here

New-Way
  • 507
  • 3
  • 12
  • The problem is that having a queue is not enough due to BullMQ model: if a worker gets a job, the job is removed from the queue. In my situation each worker is waiting its own response and I have a lot of workers since they are dynamically created. The way I have found is to have a single queue and use the ``queueEvents`` on each worker to see if something interesting happens on the queue and pull only the job each worker is waiting for. I'm trying to understand if this is a complete _anti-pattern_ or has some sense. – Fabrizio Invernizzi Oct 12 '21 at 15:25