Created
May 26, 2023 02:50
-
-
Save wermarter/31d7b8ac183bab79e3f98271a4ee0b75 to your computer and use it in GitHub Desktop.
nestjs-cargo-queue
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
export class BaseWorkerService<TaskInput, TaskOutput> { | |
private queue: QueueObject<TaskInput>; | |
private logger: Logger; | |
private workCounter = 0; | |
constructor( | |
workerName: string, | |
concurrency: number, | |
batchSize: number, | |
batchProcessor: (input: TaskInput[]) => Promise<TaskOutput[]>, | |
batchCallback: (output: TaskOutput[]) => Promise<void>, | |
) { | |
this.logger = new Logger(`worker[${workerName}]`); | |
this.queue = cargoQueue( | |
async (tasks) => { | |
const results = await batchProcessor(tasks); | |
this.logger.debug( | |
`[${++this.workCounter}] Processed ${results.length} tasks`, | |
); | |
await batchCallback(results); | |
}, | |
concurrency, | |
batchSize, | |
); | |
} | |
async push(input: TaskInput) { | |
return this.queue.pushAsync(input); | |
} | |
terminate() { | |
return this.queue.kill(); | |
} | |
} | |
const timeoutWorker = new BaseWorkerService( | |
TIMEOUT_WORKER, | |
2, | |
2, | |
async (inputs: string[]) => { | |
await new Promise((r) => setTimeout(r, 2000)); | |
return inputs; | |
}, | |
async (outputs) => { | |
outputs.forEach((output) => { | |
console.log('--', output); | |
}); | |
}, | |
); | |
const timeoutProvider = { | |
provide: TIMEOUT_WORKER, | |
useValue: timeoutWorker, | |
}; | |
@Module({ | |
imports: [], | |
providers: [BaseWorkerService, timeoutProvider], | |
exports: [timeoutProvider], | |
}) | |
export class WorkerModule {} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment