2
  • I have an express app which uses bullmq queues, schedulers and workers. Even after pressing Ctrl + C I can still see the node process running inside my Activity manager but my server on the terminal shuts down. I know this because the bullmq task starts outputting console.log statements even after the server is down to the terminal.

This is what my server.js file looks like

// eslint-disable-next-line import/first
import http from 'http';
import { app } from './app';
import { sessionParser } from './session';
import { websocketServer } from './ws';
import 'jobs/repeatable';

const server = http.createServer(app);

server.on('upgrade', (request, socket, head) => {
  sessionParser(request, {}, () => {
    websocketServer.handleUpgrade(request, socket, head, (ws) => {
      websocketServer.emit('connection', ws, request);
    });
  });
});

server.on('listening', () => {
  websocketServer.emit('listening');
});

server.on('close', () => {
  websocketServer.emit('close');
});

// https://stackoverflow.com/questions/18692536/node-js-server-close-event-doesnt-appear-to-fire
process.on('SIGINT', () => {
  server.close();
});

export { server };

Notice that I have a SIGINT handler defined above. Is this the reason my jobs are not exiting? Do I have to manually close every queue, worker and scheduler inside my SIGINT? My jobs/repeatable.js file looks as shown below

const { scheduleJobs } = require('jobs');

if (process.env.ENABLE_JOB_QUEUE === 'true') {
  scheduleJobs();
}

Here is my jobs.js file

import { scheduleDeleteExpiredTokensJob } from './delete-expired-tokens';
import { scheduleDeleteNullVotesJob } from './delete-null-votes';

export async function scheduleJobs() {
  await scheduleDeleteExpiredTokensJob();
  await scheduleDeleteNullVotesJob();
}

Here is my delete-expired-tokens.js file, other one is quite similar

import { processor as deleteExpiredTokensProcessor } from './processor';
import { queue as deleteExpiredTokensQueue } from './queue';
import { scheduler as deleteExpiredTokensScheduler } from './scheduler';
import { worker as deleteExpiredTokensWorker } from './worker';

export async function scheduleDeleteExpiredTokensJob() {
  const jobId = process.env.QUEUE_DELETE_EXPIRED_TOKENS_JOB_ID;
  const jobName = process.env.QUEUE_DELETE_EXPIRED_TOKENS_JOB_NAME;
  await deleteExpiredTokensQueue.add(jobName, null, {
    repeat: {
      cron: process.env.QUEUE_DELETE_EXPIRED_TOKENS_FREQUENCY,
      jobId,
    },
  });
}

export {
  deleteExpiredTokensProcessor,
  deleteExpiredTokensQueue,
  deleteExpiredTokensScheduler,
  deleteExpiredTokensWorker,
};

How do I shutdown bullmq task queues gracefully?

PirateApp
  • 5,433
  • 4
  • 57
  • 90

1 Answers1

2

You have to call the close() method on the workers:

server.on('close', async () => {
  websocketServer.emit('close');
  // Close the workers
  await worker.close()
});

Docs

Ionică Bizău
  • 109,027
  • 88
  • 289
  • 474
  • what about the scheduler and the queue? that doesnt need to be closed? – PirateApp Feb 18 '22 at 08:32
  • 1
    @PirateApp I am unsure about the `jobs` package you are using. It seems to be different than the one on npm. – Ionică Bizău Feb 18 '22 at 08:50
  • its not a package, the jobs is a reference to the jobs directory which simply contains a function i wrote called scheduleJobs, i am using babel module resolver to relatively reference files, basically it means this import { scheduleJobs } from 'jobs/index'; – PirateApp Feb 18 '22 at 08:53
  • 1
    @PirateApp Depending on how the scheduler is working. Those cron-like schedulers run in the same process as the server if I am not wrong, so, the worst thing that can happen is killing the prcess while the scheduler job is running. – Ionică Bizău Feb 18 '22 at 09:27
  • 1
    ok without closing the worker, scheduler and queue i can still see the node process running inside activity manager so my guess is that all 3 of them have to be closed – PirateApp Feb 19 '22 at 03:24