Skip to content

Instantly share code, notes, and snippets.

@eranbetzalel
Last active November 14, 2024 13:12
Show Gist options
  • Save eranbetzalel/d1d5c8de6fb9e125bcff3d80f3846fb4 to your computer and use it in GitHub Desktop.
Save eranbetzalel/d1d5c8de6fb9e125bcff3d80f3846fb4 to your computer and use it in GitHub Desktop.
This class provides an efficient and flexible way to handle batch processing of jobs in a BullMQ environment, leveraging NestJS. It ensures that jobs are processed in batches, which can be crucial for performance optimization in applications requiring bulk operations..
import {WorkerHost} from '@nestjs/bullmq';
import {Job} from 'bullmq';
export class BatchWorkerHost extends WorkerHost {
private jobBatchCreationTime: Date;
private jobBatch: Job[];
private jobBatchProcessPromise: Promise<void>;
private running: boolean = false;
private resolveJobBatchProcessPromise: () => void;
private rejectJobBatchProcessPromise: (error: Error) => void;
constructor(private readonly batchSize: number, private readonly batchInterval: number) {
super();
this.initializeJobBatch();
this.startBatchProcessing();
}
initializeJobBatch(): void {
this.jobBatchCreationTime = new Date();
this.jobBatch = [];
this.jobBatchProcessPromise = new Promise<void>((resolve, reject) => {
this.resolveJobBatchProcessPromise = resolve;
this.rejectJobBatchProcessPromise = reject;
});
}
async process(job: Job): Promise<void> {
this.jobBatch.push(job);
const jobBatchProcessPromise = this.jobBatchProcessPromise;
if (this.jobBatch.length >= this.batchSize) {
this.processBatchWrapper();
}
await jobBatchProcessPromise;
}
private startBatchProcessing(): void {
setInterval(() => {
if (this.jobBatch.length > 0) {
this.processBatchWrapper();
}
}, this.batchInterval);
}
private processBatchWrapper(): void {
if (this.running) {
return;
}
this.running = true;
const jobBatch = this.jobBatch;
const resolveJobBatchProcessPromise = this.resolveJobBatchProcessPromise;
const rejectJobBatchProcessPromise = this.rejectJobBatchProcessPromise;
this.initializeJobBatch();
console.log(`Processing batch ${this.jobBatchCreationTime.toISOString()} - ${jobBatch.length} jobs`);
this.processBatch(jobBatch)
.then(() => {
resolveJobBatchProcessPromise();
})
.catch(error => {
console.error(`Error processing batch ${this.jobBatchCreationTime.toISOString()} - ${jobBatch.length} jobs`, error.stack);
rejectJobBatchProcessPromise(error);
})
.finally(() => {
this.running = false;
});
}
protected async processBatch(jobs: Job[]): Promise<void> {
throw new Error('You must implement the processBatch method in your subclass.');
}
}
@eranbetzalel
Copy link
Author

Usage

To utilize this class, create a subclass and implement the processBatch method to define how each batch of jobs should be processed.

import { BatchWorkerHost } from './batch-worker-host';
import { Job } from 'bullmq';

class MyCustomBatchWorker extends BatchWorkerHost {
    constructor() {
        super(200, 5000); // Batch size of 200 and interval of 5000ms
    }

    protected async processBatch(jobs: Job[]): Promise<void> {
        // Custom batch processing logic
        for (const job of jobs) {
            // Process each job
        }
    }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment