Last active
November 14, 2024 13:12
-
-
Save eranbetzalel/d1d5c8de6fb9e125bcff3d80f3846fb4 to your computer and use it in GitHub Desktop.
This class provides an efficient and flexible way to handle batch processing of jobs in a BullMQ environment, leveraging NestJS. It ensures that jobs are processed in batches, which can be crucial for performance optimization in applications requiring bulk operations..
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import {WorkerHost} from '@nestjs/bullmq'; | |
import {Job} from 'bullmq'; | |
export class BatchWorkerHost extends WorkerHost { | |
private jobBatchCreationTime: Date; | |
private jobBatch: Job[]; | |
private jobBatchProcessPromise: Promise<void>; | |
private running: boolean = false; | |
private resolveJobBatchProcessPromise: () => void; | |
private rejectJobBatchProcessPromise: (error: Error) => void; | |
constructor(private readonly batchSize: number, private readonly batchInterval: number) { | |
super(); | |
this.initializeJobBatch(); | |
this.startBatchProcessing(); | |
} | |
initializeJobBatch(): void { | |
this.jobBatchCreationTime = new Date(); | |
this.jobBatch = []; | |
this.jobBatchProcessPromise = new Promise<void>((resolve, reject) => { | |
this.resolveJobBatchProcessPromise = resolve; | |
this.rejectJobBatchProcessPromise = reject; | |
}); | |
} | |
async process(job: Job): Promise<void> { | |
this.jobBatch.push(job); | |
const jobBatchProcessPromise = this.jobBatchProcessPromise; | |
if (this.jobBatch.length >= this.batchSize) { | |
this.processBatchWrapper(); | |
} | |
await jobBatchProcessPromise; | |
} | |
private startBatchProcessing(): void { | |
setInterval(() => { | |
if (this.jobBatch.length > 0) { | |
this.processBatchWrapper(); | |
} | |
}, this.batchInterval); | |
} | |
private processBatchWrapper(): void { | |
if (this.running) { | |
return; | |
} | |
this.running = true; | |
const jobBatch = this.jobBatch; | |
const resolveJobBatchProcessPromise = this.resolveJobBatchProcessPromise; | |
const rejectJobBatchProcessPromise = this.rejectJobBatchProcessPromise; | |
this.initializeJobBatch(); | |
console.log(`Processing batch ${this.jobBatchCreationTime.toISOString()} - ${jobBatch.length} jobs`); | |
this.processBatch(jobBatch) | |
.then(() => { | |
resolveJobBatchProcessPromise(); | |
}) | |
.catch(error => { | |
console.error(`Error processing batch ${this.jobBatchCreationTime.toISOString()} - ${jobBatch.length} jobs`, error.stack); | |
rejectJobBatchProcessPromise(error); | |
}) | |
.finally(() => { | |
this.running = false; | |
}); | |
} | |
protected async processBatch(jobs: Job[]): Promise<void> { | |
throw new Error('You must implement the processBatch method in your subclass.'); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Usage
To utilize this class, create a subclass and implement the
processBatch
method to define how each batch of jobs should be processed.