1

I'm trying to implement a queue using the BullQueue package in nest.js. I have two queues, one is a file queue and another is an email queue.

I use the file queue to regenerate thumbnails in the background after the thumbnail is uploaded. And email queue to send the email. The problem is that my file queue is working completely fine but the email queue I have created gives me an inconsistent result. Sometimes the jobs are getting picked up by the @Process('JobName') function and sometimes it doesn't.

More importantly, I have noticed that when the problem occurs, I tried removing the dist folder and restarting the server which regenerated the dist directory. It worked until it didn't after a few trials.

This is my folder structure.

my-nest-app/
├─ src/
│  ├─ bull-queue/
│  │  ├─ bull-queue.module.ts
│  │  ├─ file-queue/
│  │  │  ├─ file-queue-producer.service.ts
│  │  │  ├─ file-queue-consumer.service.ts
│  │  ├─ email-queue/
│  │  │  ├─ email-producer.service.ts
│  │  │  ├─ email-processor.service.ts

bull-queue.module.ts

export enum BullQueue {
    FileQueue = 'file-queue',
    EmailQueue = 'email-queue',
}

@Global()
@Module({
    imports: [
        BullModule.forRoot({
            redis: {
                host: process.env.REDIS_HOST,
                port: parseInt(process.env.REDIS_PORT) || 6379
            }
        }),
        BullModule.registerQueue(
            { name: BullQueue.FileQueue },
            { name: BullQueue.EmailQueue }
        ),
        ProjectModule,
        FileModule
    ],
    providers: [
        BullQueueService,
        FileQueueConsumerService,
        FileQueueProducerService,
        EmailProcessorService,
        EmailProducerService
    ],
    controllers: [],
    exports: [FileQueueProducerService, EmailProducerService]
})
export class BullQueueModule {}

email-producer.service.ts

export enum EmailJobs {
    PasswordReset = 'password_reset',
}

@Injectable()
export class EmailProducerService {
    constructor(@InjectQueue(BullQueue.EmailQueue) private queue:Queue){}

    async password_reset_email(payload: PasswordResetPayload){
        await this.queue.add(EmailJobs.PasswordReset, payload);
    }

}

email-processor.service.ts

@Processor(BullQueue.EmailQueue)
export class EmailProcessorService {

    constructor(
        private readonly loggerService: LoggerService,
        private readonly helperService: HelperService
    ) {}



    @Process(EmailJobs.PasswordReset)
    async password_reset(job: Job<PasswordResetPayload>) {
        console.log('password_reset Job Processor')
    }
}

I was wondering if I violated any design principle or if this was just an issue with the queue package manager.

Howard Scott
  • 171
  • 1
  • 11

0 Answers0