Created
August 22, 2025 06:56
-
-
Save wilsonowilson/1b269939de27ccb22b404779512fc6c5 to your computer and use it in GitHub Desktop.
Bullmq wrapper
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import { createBullBoard } from '@bull-board/api'; | |
| import { BullMQAdapter } from '@bull-board/api/bullMQAdapter'; | |
| import { HonoAdapter } from '@bull-board/hono'; | |
| import { serveStatic } from '@hono/node-server/serve-static'; | |
| import { | |
| Job, | |
| type JobsOptions, | |
| MetricsTime, | |
| type Processor, | |
| Queue, | |
| QueueEvents, | |
| type QueueOptions, | |
| Worker, | |
| type WorkerOptions | |
| } from 'bullmq'; | |
| import Redis from 'ioredis'; | |
| import { Duration } from './duration'; | |
| const connection = new Redis(process.env.EVENTS_REDIS_URL!, { | |
| maxRetriesPerRequest: null | |
| }); | |
| type AugmentedQueue<T> = Omit<Queue<T>, 'add'> & { | |
| events: QueueEvents; | |
| add(data: T, opts?: JobsOptions): ReturnType<Queue<T>['add']>; | |
| addNamed(name: string, data: T, opts?: JobsOptions): ReturnType<Queue<T>['add']>; | |
| getJob: (jobId: string) => Job; | |
| }; | |
| type RegisteredQueue = { | |
| queue: Queue; | |
| queueEvents: QueueEvents; | |
| worker: Worker; | |
| }; | |
| declare global { | |
| var __registeredQueues: Record<string, RegisteredQueue> | undefined; | |
| } | |
| const registeredQueues = global.__registeredQueues || (global.__registeredQueues = {}); | |
| const DEFAULT_JOB_NAME = 'default'; | |
| /** | |
| * Register a queue with a processor and optional configuration | |
| * @param name Unique name of the queue | |
| * @param processor The job processor function | |
| * @param options Optional configuration for worker and queue | |
| */ | |
| export function registerQueue<T = any>( | |
| name: string, | |
| options: { | |
| worker?: Omit<WorkerOptions, 'connection'>; | |
| queue?: Omit<QueueOptions, 'connection'>; | |
| }, | |
| processor: Processor<T> | |
| ): AugmentedQueue<T> { | |
| const dev = process.env.PRODUCTION !== 'true'; | |
| name = dev ? `${name}-dev` : name; | |
| if (!registeredQueues[name]) { | |
| const queue = new Queue(name, { | |
| connection, | |
| ...options?.queue, | |
| defaultJobOptions: { | |
| ...options?.queue?.defaultJobOptions | |
| } | |
| }); | |
| const queueEvents = new QueueEvents(name, { connection }); | |
| const worker = new Worker<T>(name, processor, { | |
| connection, | |
| ...options?.worker, | |
| ...defaultWorkerOptions | |
| }); | |
| worker.on('failed', (job, err) => { | |
| console.error(`❌ ${name}: ${job?.id} has failed with ${err.message}`, err); | |
| }); | |
| worker.on('completed', (job) => { | |
| console.info(`✅ ${name}: ${job.id} has completed!`); | |
| }); | |
| registeredQueues[name] = { queue, queueEvents, worker }; | |
| bullBoard.addQueue(new BullMQAdapter(queue)); | |
| } | |
| const { queue, queueEvents } = registeredQueues[name]; | |
| // @ts-ignore | |
| const augmentedQueue: AugmentedQueue<T> = { | |
| ...queue, | |
| events: queueEvents, | |
| // @ts-ignore | |
| add: (data: T, opts?: JobsOptions) => queue.add(DEFAULT_JOB_NAME, data, opts), | |
| // @ts-ignore | |
| addNamed: (name: string, data: T, opts?: JobsOptions) => queue.add(name, data, opts), | |
| // @ts-ignore | |
| addBulk: (jobs: { name: string; data: T; opts?: JobsOptions }[]) => | |
| queue.addBulk(jobs.map((job) => ({ name: job.name, data: job.data, opts: job.opts }))), | |
| // @ts-ignore | |
| getJob: (jobId: string) => Job.fromId(queue, jobId), | |
| queue: queue | |
| }; | |
| return augmentedQueue; | |
| } | |
| export const bullBoardServerAdapter = new HonoAdapter(serveStatic); | |
| export const bullBoard = createBullBoard({ | |
| queues: [], | |
| serverAdapter: bullBoardServerAdapter, | |
| options: { | |
| uiConfig: { | |
| boardTitle: 'FERNDESK BULL', | |
| miscLinks: [{ text: 'Logout', url: '/logout' }] | |
| } | |
| } | |
| }); | |
| export const closeAllWorkers = async () => { | |
| await Promise.all(Object.values(registeredQueues).map((e) => e.worker.close())); | |
| }; | |
| export const defaultWorkerOptions: Partial<WorkerOptions> = { | |
| metrics: { | |
| maxDataPoints: MetricsTime.ONE_MONTH | |
| }, | |
| removeOnComplete: { | |
| age: Duration({ days: 7 }).toMilliseconds(), | |
| count: 2000 | |
| }, | |
| removeOnFail: { | |
| age: Duration({ days: 7 }).toMilliseconds(), | |
| count: 5000 | |
| } | |
| }; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment