Skip to content

Instantly share code, notes, and snippets.

@wilsonowilson
Created August 22, 2025 06:56
Show Gist options
  • Select an option

  • Save wilsonowilson/1b269939de27ccb22b404779512fc6c5 to your computer and use it in GitHub Desktop.

Select an option

Save wilsonowilson/1b269939de27ccb22b404779512fc6c5 to your computer and use it in GitHub Desktop.
Bullmq wrapper
import { createBullBoard } from '@bull-board/api';
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter';
import { HonoAdapter } from '@bull-board/hono';
import { serveStatic } from '@hono/node-server/serve-static';
import {
Job,
type JobsOptions,
MetricsTime,
type Processor,
Queue,
QueueEvents,
type QueueOptions,
Worker,
type WorkerOptions
} from 'bullmq';
import Redis from 'ioredis';
import { Duration } from './duration';
const connection = new Redis(process.env.EVENTS_REDIS_URL!, {
maxRetriesPerRequest: null
});
type AugmentedQueue<T> = Omit<Queue<T>, 'add'> & {
events: QueueEvents;
add(data: T, opts?: JobsOptions): ReturnType<Queue<T>['add']>;
addNamed(name: string, data: T, opts?: JobsOptions): ReturnType<Queue<T>['add']>;
getJob: (jobId: string) => Job;
};
type RegisteredQueue = {
queue: Queue;
queueEvents: QueueEvents;
worker: Worker;
};
declare global {
var __registeredQueues: Record<string, RegisteredQueue> | undefined;
}
const registeredQueues = global.__registeredQueues || (global.__registeredQueues = {});
const DEFAULT_JOB_NAME = 'default';
/**
* Register a queue with a processor and optional configuration
* @param name Unique name of the queue
* @param processor The job processor function
* @param options Optional configuration for worker and queue
*/
export function registerQueue<T = any>(
name: string,
options: {
worker?: Omit<WorkerOptions, 'connection'>;
queue?: Omit<QueueOptions, 'connection'>;
},
processor: Processor<T>
): AugmentedQueue<T> {
const dev = process.env.PRODUCTION !== 'true';
name = dev ? `${name}-dev` : name;
if (!registeredQueues[name]) {
const queue = new Queue(name, {
connection,
...options?.queue,
defaultJobOptions: {
...options?.queue?.defaultJobOptions
}
});
const queueEvents = new QueueEvents(name, { connection });
const worker = new Worker<T>(name, processor, {
connection,
...options?.worker,
...defaultWorkerOptions
});
worker.on('failed', (job, err) => {
console.error(`❌ ${name}: ${job?.id} has failed with ${err.message}`, err);
});
worker.on('completed', (job) => {
console.info(`✅ ${name}: ${job.id} has completed!`);
});
registeredQueues[name] = { queue, queueEvents, worker };
bullBoard.addQueue(new BullMQAdapter(queue));
}
const { queue, queueEvents } = registeredQueues[name];
// @ts-ignore
const augmentedQueue: AugmentedQueue<T> = {
...queue,
events: queueEvents,
// @ts-ignore
add: (data: T, opts?: JobsOptions) => queue.add(DEFAULT_JOB_NAME, data, opts),
// @ts-ignore
addNamed: (name: string, data: T, opts?: JobsOptions) => queue.add(name, data, opts),
// @ts-ignore
addBulk: (jobs: { name: string; data: T; opts?: JobsOptions }[]) =>
queue.addBulk(jobs.map((job) => ({ name: job.name, data: job.data, opts: job.opts }))),
// @ts-ignore
getJob: (jobId: string) => Job.fromId(queue, jobId),
queue: queue
};
return augmentedQueue;
}
export const bullBoardServerAdapter = new HonoAdapter(serveStatic);
export const bullBoard = createBullBoard({
queues: [],
serverAdapter: bullBoardServerAdapter,
options: {
uiConfig: {
boardTitle: 'FERNDESK BULL',
miscLinks: [{ text: 'Logout', url: '/logout' }]
}
}
});
export const closeAllWorkers = async () => {
await Promise.all(Object.values(registeredQueues).map((e) => e.worker.close()));
};
export const defaultWorkerOptions: Partial<WorkerOptions> = {
metrics: {
maxDataPoints: MetricsTime.ONE_MONTH
},
removeOnComplete: {
age: Duration({ days: 7 }).toMilliseconds(),
count: 2000
},
removeOnFail: {
age: Duration({ days: 7 }).toMilliseconds(),
count: 5000
}
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment