Created
February 26, 2022 20:22
-
-
Save m5r/b2f1f0d044bba435d58aab67e82cf79b to your computer and use it in GitHub Desktop.
bullmq job queue in Remix
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import notifierQueue from "~/queues/notifier.server.ts"; | |
export const loader = async () => { | |
await notifierQueue.add("test", { emailAddress: "[email protected]" }); | |
return null; | |
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { Queue } from "~/utils/queue.server"; | |
type QueueData = { | |
emailAddress: string; | |
}; | |
export const queue = Queue<QueueData>("notifier", async (job) => { | |
console.log(`Sending email to ${job.data.emailAddress}`); | |
// Delay 1 second to simulate sending an email, be it for user registration, a newsletter, etc. | |
await new Promise((resolve) => setTimeout(resolve, 1000)); | |
console.log(`Email sent to ${job.data.emailAddress}`); | |
}); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { | |
type Processor, | |
Queue as BullQueue, | |
Worker, | |
QueueScheduler, | |
} from "bullmq"; | |
import redis from "./redis.server"; | |
type RegisteredQueue = { | |
queue: BullQueue; | |
worker: Worker; | |
scheduler: QueueScheduler; | |
}; | |
declare global { | |
var __registeredQueues: Record<string, RegisteredQueue> | undefined; | |
} | |
const registeredQueues = global.__registeredQueues || (global.__registeredQueues = {}); | |
export function Queue<Payload>(name: string, handler: Processor<Payload>): BullQueue<Payload> { | |
if (registeredQueues[name]) { | |
return registeredQueues[name].queue; | |
} | |
const queue = new BullQueue<Payload>(name, { connection: redis }); | |
const worker = new Worker<Payload>(name, handler, { connection: redis }); | |
const scheduler = new QueueScheduler(name, { connection: redis }); | |
registeredQueues[name] = { queue, scheduler, worker }; | |
return queue; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Redis, { type Redis as RedisType, type RedisOptions } from "ioredis"; | |
let redis: RedisType; | |
declare global { | |
var __redis: RedisType | undefined; | |
} | |
const redisOptions: RedisOptions = { | |
maxRetriesPerRequest: null, | |
enableReadyCheck: false, | |
}; | |
// this is needed because in development we don't want to restart | |
// the server with every change, but we want to make sure we don't | |
// create a new connection to the Redis with every change either. | |
if (process.env.NODE_ENV === "production") { | |
redis = new Redis(process.env.REDIS_URL, redisOptions); | |
} else { | |
if (!global.__redis) { | |
global.__redis = new Redis(process.env.REDIS_URL, redisOptions); | |
} | |
redis = global.__redis; | |
} | |
export default redis; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment