1

According to the docs NestJS Docs there are two ways of implementing queues. With redis and named or unnamed queues e.g. like here: Queue Redis Example or here Queue Redis Example 2 and Queues which are running in separate processes with processors.

I recently tried to implement the second way found here Queues in separate processes but my processors are not recognised, ending up with:

Error message

Does any one can help me and provide a minimal working example on this?

I saw that there is already, a very recently opened issue: Queues Processes Docs Issue but if somebody can help me or this guy in the thread provided, I can make a PR or the official maintainers could do this.

Thanks in advance! Best regards Ragitagha

ragitagha
  • 15
  • 2
  • 6
  • Can you provide a code sample or repo to review? The error is suggesting that your csv-queue processor is not accessible in the current scope of the process you're running. – The Geek May 08 '21 at 10:59
  • You are correct, because I tried to move from the normal queue to separate processors. And then the processor is not found anymore. Have you experience with separate queues ? I will provide a sample after I came home. Thanks! – ragitagha May 08 '21 at 13:03
  • @TheGeek Please find a minimal repo here: [Github repo](https://github.com/Ragitagha/nestjs-queues) This works with normal queues. Please start up a Redis Docker Container on port 6379, if you clone it. My problem is, to figure out how to implement the same queue in a separate process as stated here: [NestJs Docs](https://docs.nestjs.com/techniques/queues#separate-processes). May I kindly ask for your help ? Thanks in advance – ragitagha May 08 '21 at 13:22

1 Answers1

3

The NestJS bull package is nice wrapper around bull which provides support for jobs being run in a separate process.

To take advantage of bulls auto forking processing, you just need to provide a path to a file that can act as the jobs processor.

Create the separate file you want to run the job with

import { Logger } from '@nestjs/common';
import { DoneCallback, Job } from 'bull';

export default function (job: Job<{ message: string }>, cb: DoneCallback) {
  Logger.verbose(`${job.data.message} (pid ${process.pid})`, `SEPARATE`);
  cb(null, 'Hurrah');
}

Remember, this file will need to be able to be run on its own. Of course you can add additional imports and such, but it will be running in a different process so your application and its resources will not be available to it.

When registering your queues, specify the path to the separate file. Here I register 2 queues. One to run in the SAME process as the application and one the will run in a SEPARATE process

@Module({
  imports: [
    // register root
    BullModule.forRoot({
      redis: {
        host: 'localhost',
        port: 6379,
      },
    }),
    // register queues
    BullModule.registerQueue(
      {
        name: 'SAME', // this will run in same process as this module
      },
      {
        name: 'SEPARATE', // this will run in its own process
        processors: [join(__dirname, 'separate.process')],
      },
    ),
  ],
  controllers: [AppController],
  providers: [SameService],
})
export class AppModule {}

Then you can fire off jobs as normal. Below I fire off a job to a queue in the application process, then another job into the separate file

@Controller()
export class AppController {
  constructor(
    @InjectQueue('SAME') private readonly same: Queue,
    @InjectQueue('SEPARATE') private readonly separate: Queue,
  ) {}

  @Get()
  getHello(): string {

    // Example of adding a job processed in same thread
    this.same.add({ message: 'Knock knock.' });

    // Example of adding a job processed in separate thread
    this.separate.add({ message: 'FORK OFF.' });

    return 'ok';
  }
}

localhost:3000 then outputs

[Nest] 13400   - 08/05/2021, 16:49:18   [SAME] Knock knock. (pid 13400) +8821ms
[Nest] 2660   - 08/05/2021, 16:49:19   [SEPARATE] FORK OFF. (pid 2660)

Here is a link to the repo

Hope this helps!

The Geek
  • 1,185
  • 9
  • 12