Skip to content

Instantly share code, notes, and snippets.

@NuroDev
Created October 13, 2025 11:15
Show Gist options
  • Save NuroDev/aa1ad514b48d14bdd0b890084538aa8d to your computer and use it in GitHub Desktop.
Save NuroDev/aa1ad514b48d14bdd0b890084538aa8d to your computer and use it in GitHub Desktop.
⏭️ Queue Controller - A minimal `Hono` style controller for Cloudflare Workers `queue`
import { env } from 'cloudflare:workers';
import type { z } from 'zod';
export type QueueBody = {
payload?: unknown;
type: string;
};
type KeysOfType<T, V> = {
[K in keyof T]: T[K] extends V ? K : never;
}[keyof T];
type BaseQueueContext<E> = {
env: E;
executionCtx: ExecutionContext;
message: Message<QueueBody>;
};
type QueueContext<E, P = never> = [P] extends [never]
? BaseQueueContext<E>
: [P] extends [undefined]
? BaseQueueContext<E>
: BaseQueueContext<E> & { payload: P };
type EmptyQueueHandler<E> = (context: QueueContext<E>) => void | Promise<void>;
type ZodQueueHandler<E, S extends z.ZodType> = (
context: QueueContext<E, z.infer<S>>,
) => void | Promise<void>;
type PayloadQueueHandler<E, P> = (context: QueueContext<E, P>) => void | Promise<void>;
type SendMethods<H extends Record<string, unknown>> = {
[K in keyof H]: H[K] extends void
? () => Promise<void>
: H[K] extends undefined
? () => Promise<void>
: (payload: H[K], options?: QueueSendOptions) => Promise<void>;
};
type BatchMessageItem<T> = T extends undefined
? { options?: QueueSendOptions }
: { payload: T; options?: QueueSendOptions };
type SendBatchMethods<H extends Record<string, unknown>> = {
[K in keyof H]: (messages: Array<BatchMessageItem<H[K]>>) => Promise<void>;
};
// biome-ignore lint/complexity/noBannedTypes: We require an empty object type here
export class QueueController<E = Env, H extends Record<string, unknown> = {}> {
private _binding: KeysOfType<E, Queue>;
private _handlers = new Map<
string,
| EmptyQueueHandler<E>
| ZodQueueHandler<E, z.ZodType>
| PayloadQueueHandler<E, unknown>
>();
private _schemas = new Map<string, z.ZodType>();
constructor(binding: NoInfer<KeysOfType<E, Queue>>) {
this._binding = binding;
}
handler<T extends string>(
type: T,
handler: EmptyQueueHandler<E>,
): QueueController<E, H & Record<T, void>>;
handler<T extends string, S extends z.ZodType>(
type: T,
schema: S,
handler: ZodQueueHandler<E, S>,
): QueueController<E, H & Record<T, z.infer<S>>>;
handler<T extends string, S extends z.ZodType>(
type: T,
schemaOrHandler: S | EmptyQueueHandler<E> | PayloadQueueHandler<E, unknown>,
maybeHandler?: ZodQueueHandler<E, S> | EmptyQueueHandler<E>,
): QueueController<E, H & Record<T, S extends z.ZodType ? z.infer<S> : void>> {
if (maybeHandler) {
this._schemas.set(type, schemaOrHandler as S);
this._handlers.set(type, maybeHandler);
return this;
}
this._handlers.set(
type,
schemaOrHandler as EmptyQueueHandler<E> | PayloadQueueHandler<E, unknown>,
);
return this;
}
get process(): ExportedHandlerQueueHandler<E, QueueBody> {
return async (batch, env, ctx) => {
for (const message of batch.messages) {
const { type, payload } = message.body;
const handler = this._handlers.get(type);
if (!handler) {
console.error(`No handler found for message type: ${type}`);
message.retry({ delaySeconds: 30 });
continue;
}
try {
const schema = this._schemas.get(type);
const validatedPayload = schema ? schema.parse(payload) : payload;
await handler({
env,
executionCtx: ctx,
message,
payload: validatedPayload,
});
message.ack();
} catch (error) {
console.error(`Error processing message type ${type}:`, error);
message.retry({ delaySeconds: 30 });
}
}
};
}
get send(): SendMethods<H> {
return new Proxy({} as SendMethods<H>, {
get: (_, type) => async (payload?: unknown, options?: QueueSendOptions) => {
const binding = env[this._binding as keyof Env] as unknown as Queue;
await binding.send({ type, payload }, options);
},
});
}
get sendBatch(): SendBatchMethods<H> {
return new Proxy({} as SendBatchMethods<H>, {
get: (_, type) => async (messages: Array<BatchMessageItem<unknown>>) => {
const binding = env[this._binding as keyof Env] as unknown as Queue;
await binding.sendBatch(
messages.map((msg) => ({
body: {
type,
payload: 'payload' in msg ? msg.payload : undefined,
},
...(msg.options || {}),
})),
);
},
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment