Skip to content

Instantly share code, notes, and snippets.

@omeroot
Created January 2, 2025 12:34
Show Gist options
  • Select an option

  • Save omeroot/a25c9f0edc4e8535ef10b525b3a8c689 to your computer and use it in GitHub Desktop.

Select an option

Save omeroot/a25c9f0edc4e8535ef10b525b3a8c689 to your computer and use it in GitHub Desktop.
import { Queue, Worker, QueueEvents } from "bullmq";
import type { QueueOptions, Processor, QueueEventsListener } from "bullmq";
// Define the structure of a message to be added to the queue
export type QueueMessage = {
action: string;
data?: any;
additional?: any;
};
// Define the structure of arguments passed to event callbacks
export type QueueEventCallbackArgs = {
jobId: string;
prev?: string;
};
export class ScraperQueue {
private name: string;
private queue: Queue;
private queueEvents: QueueEvents;
constructor(name: string, opts: QueueOptions) {
this.name = name;
// Initialize the queue with the given name and options
this.queue = new Queue(this.name, opts);
// Handle errors from the queue
this.queue.on("error", (err) => {
console.error(err);
});
// Handle Redis connection closure
this.queue.on("ioredis:close", () => {
console.error("Redis connection closed");
});
// Initialize queue events for monitoring
this.queueEvents = new QueueEvents(this.name, {
connection: opts.connection,
});
}
// Add a message to the queue
addToQueue(message: QueueMessage) {
this.queue.add(message.action, message);
}
// Create a worker to process jobs from the queue
// Runner is your function it is running job
readFromQueue(runner: Processor) {
const worker = new Worker(this.name, runner, {
connection: this.queue.opts.connection,
});
return worker;
}
// Retrieve a job from the queue by its ID
getJob(jobId: string) {
return this.queue.getJob(jobId);
}
// Listen for specific events on the queue
listen(
event: keyof QueueEventsListener,
callback: (args: QueueEventCallbackArgs, id: string) => void
) {
this.queueEvents.on(event, callback);
}
}
@omeroot
Copy link
Copy Markdown
Author

omeroot commented Jan 2, 2025

import { Job } from "bullmq";
import { ScraperQueue } from "./bullmq";

const q = new ScraperQueue("ycomb", {
  connection: {
    host: "localhost",
    port: 6379,
  },
});

q.listen("added", (args, id) => {
  console.log(args, id);
});

console.log("Adding job to queue...");
q.addToQueue({
  action: "lookup",
  data: {
    url: "news.ycombinator.com",
  },
});

q.readFromQueue((job: Job) => {
  console.log("Readed news", job.id);
  return Promise.resolve(JSON.stringify({ result: "omer", x: job.id }));
});

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment