Created
January 2, 2025 12:34
-
-
Save omeroot/a25c9f0edc4e8535ef10b525b3a8c689 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import { Queue, Worker, QueueEvents } from "bullmq"; | |
| import type { QueueOptions, Processor, QueueEventsListener } from "bullmq"; | |
| // Define the structure of a message to be added to the queue | |
| export type QueueMessage = { | |
| action: string; | |
| data?: any; | |
| additional?: any; | |
| }; | |
| // Define the structure of arguments passed to event callbacks | |
| export type QueueEventCallbackArgs = { | |
| jobId: string; | |
| prev?: string; | |
| }; | |
| export class ScraperQueue { | |
| private name: string; | |
| private queue: Queue; | |
| private queueEvents: QueueEvents; | |
| constructor(name: string, opts: QueueOptions) { | |
| this.name = name; | |
| // Initialize the queue with the given name and options | |
| this.queue = new Queue(this.name, opts); | |
| // Handle errors from the queue | |
| this.queue.on("error", (err) => { | |
| console.error(err); | |
| }); | |
| // Handle Redis connection closure | |
| this.queue.on("ioredis:close", () => { | |
| console.error("Redis connection closed"); | |
| }); | |
| // Initialize queue events for monitoring | |
| this.queueEvents = new QueueEvents(this.name, { | |
| connection: opts.connection, | |
| }); | |
| } | |
| // Add a message to the queue | |
| addToQueue(message: QueueMessage) { | |
| this.queue.add(message.action, message); | |
| } | |
| // Create a worker to process jobs from the queue | |
| // Runner is your function it is running job | |
| readFromQueue(runner: Processor) { | |
| const worker = new Worker(this.name, runner, { | |
| connection: this.queue.opts.connection, | |
| }); | |
| return worker; | |
| } | |
| // Retrieve a job from the queue by its ID | |
| getJob(jobId: string) { | |
| return this.queue.getJob(jobId); | |
| } | |
| // Listen for specific events on the queue | |
| listen( | |
| event: keyof QueueEventsListener, | |
| callback: (args: QueueEventCallbackArgs, id: string) => void | |
| ) { | |
| this.queueEvents.on(event, callback); | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Uh oh!
There was an error while loading. Please reload this page.