Skip to content

Instantly share code, notes, and snippets.

@saiashirwad
Created May 25, 2023 11:17
Show Gist options
  • Save saiashirwad/6efbedd0894a8d64568dd14b36cf56fc to your computer and use it in GitHub Desktop.
Save saiashirwad/6efbedd0894a8d64568dd14b36cf56fc to your computer and use it in GitHub Desktop.
import {
Queue,
Worker,
type ConnectionOptions,
type Job,
type QueueOptions,
type RedisConnection,
type WorkerOptions,
} from "bullmq";
import { partial } from "ramda";
export type DefaultQueueContext = {
db: DB;
redis: RedisClient;
};
type Processor<
Context,
DataType = unknown,
ResultType = unknown,
NameType extends string = string,
> = (
ctx: Context,
job: Job<DataType, ResultType, NameType>,
) => Promise<ResultType>;
export type BullQueue<
Context,
DataType = unknown,
ResultType = unknown,
NameType extends string = string,
> = (
ctx: Context & {
connection: ConnectionOptions;
},
queueOptions?: QueueOptions,
) => {
queue: Queue<DataType, ResultType, NameType>;
worker: Worker<DataType, ResultType, NameType>;
};
export const createQueue =
<
DataType = unknown,
Context = DefaultQueueContext,
ResultType = unknown,
NameType extends string = string,
>(
name: string,
processor: Processor<Context, DataType, ResultType, NameType>,
workerOptions?: WorkerOptions,
Connection?: typeof RedisConnection,
): BullQueue<Context, DataType, ResultType, NameType> =>
(
ctx: Context & {
connection: ConnectionOptions;
},
queueOptions?: QueueOptions,
) => {
const { connection, ..._ctx } = ctx;
const queue = new Queue<DataType, ResultType, NameType>(
name,
{ connection, ...queueOptions },
Connection,
);
const worker = new Worker<DataType, ResultType, NameType>(
name,
partial(processor, [_ctx]),
workerOptions,
Connection,
);
return { queue, worker };
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment