I am trying to create a results queue on BullMQ where all workers can send results adding jobs with special crafted jobIDs. The idea is that all results arrive with a specific jobID generated so that I known exactly which process the result is for.
I've tried with getNextJob
as described in the documentation, with no luck.
The solution I found is using the queuEvents
: each process registers a listener on the waiting
state of the results queue and, when the job with the id It needs arrives, the process gets the job with a getJob
, reads the result data and tries to move the job to completed. It works and I can correctly grab the results produced by workers.
The problem I have is moving result jobs to the completed state, since I cannot configure a lock token with the getJob
and I receive a Missing lock for job
error and the job remains in the active state.
This is the (pseudo-)code I use on the process
const jobID = "THE_ID_OF_THE_JOB_I_AM_WAITING_FOR";
const token = `${jobID}_results_worker`;
const queueEvents = new QueueEvents('results');
const resQueue = this.queues.get('results');
// I define a callback function to be able to remove the listener
const waitResult = async (job: {jobId: any}) => {
if (job.jobId === jobID){
debug(`Result job for ${jobID} received!`);
const resJob = await resQueue?.getJob(jobID) as Job;
queueEvents.removeListener('waiting', waitResult);
// THIS GENERATES the error
resJob?.moveToCompleted('Results received', token, false);
resolve(resJob?.data);
}
}
// Register the callback function on the queue
const listener = queueEvents?.on('waiting', waitResult );
Does anybody have any idea about how to correctly handle the moveToCompleted
?