-
-
Save umaar/ae7adb35d86c0f7d08d7269d4ed34449 to your computer and use it in GitHub Desktop.
This class provides an efficient and flexible way to handle batch processing of jobs in a BullMQ environment, leveraging NestJS. It ensures that jobs are processed in batches, which can be crucial for performance optimization in applications requiring bulk operations..
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import {WorkerHost} from '@nestjs/bullmq'; | |
import {Job} from 'bullmq'; | |
export class BatchWorkerHost extends WorkerHost { | |
private jobBatchCreationTime: Date; | |
private jobBatch: Job[]; | |
private jobBatchProcessPromise: Promise<void>; | |
private running: boolean = false; | |
private resolveJobBatchProcessPromise: () => void; | |
private rejectJobBatchProcessPromise: (error: Error) => void; | |
constructor(private readonly batchSize: number, private readonly batchInterval: number) { | |
super(); | |
this.initializeJobBatch(); | |
this.startBatchProcessing(); | |
} | |
initializeJobBatch(): void { | |
this.jobBatchCreationTime = new Date(); | |
this.jobBatch = []; | |
this.jobBatchProcessPromise = new Promise<void>((resolve, reject) => { | |
this.resolveJobBatchProcessPromise = resolve; | |
this.rejectJobBatchProcessPromise = reject; | |
}); | |
} | |
async process(job: Job): Promise<void> { | |
this.jobBatch.push(job); | |
const jobBatchProcessPromise = this.jobBatchProcessPromise; | |
if (this.jobBatch.length >= this.batchSize) { | |
this.processBatchWrapper(); | |
} | |
await jobBatchProcessPromise; | |
} | |
private startBatchProcessing(): void { | |
setInterval(() => { | |
if (this.jobBatch.length > 0) { | |
this.processBatchWrapper(); | |
} | |
}, this.batchInterval); | |
} | |
private processBatchWrapper(): void { | |
if (this.running) { | |
return; | |
} | |
this.running = true; | |
const jobBatch = this.jobBatch; | |
const resolveJobBatchProcessPromise = this.resolveJobBatchProcessPromise; | |
const rejectJobBatchProcessPromise = this.rejectJobBatchProcessPromise; | |
this.initializeJobBatch(); | |
console.log(`Processing batch ${this.jobBatchCreationTime.toISOString()} - ${jobBatch.length} jobs`); | |
this.processBatch(jobBatch) | |
.then(() => { | |
resolveJobBatchProcessPromise(); | |
}) | |
.catch(error => { | |
console.error(`Error processing batch ${this.jobBatchCreationTime.toISOString()} - ${jobBatch.length} jobs`, error.stack); | |
rejectJobBatchProcessPromise(error); | |
}) | |
.finally(() => { | |
this.running = false; | |
}); | |
} | |
protected async processBatch(jobs: Job[]): Promise<void> { | |
throw new Error('You must implement the processBatch method in your subclass.'); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment