Created
October 13, 2025 11:15
-
-
Save NuroDev/aa1ad514b48d14bdd0b890084538aa8d to your computer and use it in GitHub Desktop.
⏭️ Queue Controller - A minimal `Hono` style controller for Cloudflare Workers `queue`
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import { env } from 'cloudflare:workers'; | |
| import type { z } from 'zod'; | |
| export type QueueBody = { | |
| payload?: unknown; | |
| type: string; | |
| }; | |
| type KeysOfType<T, V> = { | |
| [K in keyof T]: T[K] extends V ? K : never; | |
| }[keyof T]; | |
| type BaseQueueContext<E> = { | |
| env: E; | |
| executionCtx: ExecutionContext; | |
| message: Message<QueueBody>; | |
| }; | |
| type QueueContext<E, P = never> = [P] extends [never] | |
| ? BaseQueueContext<E> | |
| : [P] extends [undefined] | |
| ? BaseQueueContext<E> | |
| : BaseQueueContext<E> & { payload: P }; | |
| type EmptyQueueHandler<E> = (context: QueueContext<E>) => void | Promise<void>; | |
| type ZodQueueHandler<E, S extends z.ZodType> = ( | |
| context: QueueContext<E, z.infer<S>>, | |
| ) => void | Promise<void>; | |
| type PayloadQueueHandler<E, P> = (context: QueueContext<E, P>) => void | Promise<void>; | |
| type SendMethods<H extends Record<string, unknown>> = { | |
| [K in keyof H]: H[K] extends void | |
| ? () => Promise<void> | |
| : H[K] extends undefined | |
| ? () => Promise<void> | |
| : (payload: H[K], options?: QueueSendOptions) => Promise<void>; | |
| }; | |
| type BatchMessageItem<T> = T extends undefined | |
| ? { options?: QueueSendOptions } | |
| : { payload: T; options?: QueueSendOptions }; | |
| type SendBatchMethods<H extends Record<string, unknown>> = { | |
| [K in keyof H]: (messages: Array<BatchMessageItem<H[K]>>) => Promise<void>; | |
| }; | |
| // biome-ignore lint/complexity/noBannedTypes: We require an empty object type here | |
| export class QueueController<E = Env, H extends Record<string, unknown> = {}> { | |
| private _binding: KeysOfType<E, Queue>; | |
| private _handlers = new Map< | |
| string, | |
| | EmptyQueueHandler<E> | |
| | ZodQueueHandler<E, z.ZodType> | |
| | PayloadQueueHandler<E, unknown> | |
| >(); | |
| private _schemas = new Map<string, z.ZodType>(); | |
| constructor(binding: NoInfer<KeysOfType<E, Queue>>) { | |
| this._binding = binding; | |
| } | |
| handler<T extends string>( | |
| type: T, | |
| handler: EmptyQueueHandler<E>, | |
| ): QueueController<E, H & Record<T, void>>; | |
| handler<T extends string, S extends z.ZodType>( | |
| type: T, | |
| schema: S, | |
| handler: ZodQueueHandler<E, S>, | |
| ): QueueController<E, H & Record<T, z.infer<S>>>; | |
| handler<T extends string, S extends z.ZodType>( | |
| type: T, | |
| schemaOrHandler: S | EmptyQueueHandler<E> | PayloadQueueHandler<E, unknown>, | |
| maybeHandler?: ZodQueueHandler<E, S> | EmptyQueueHandler<E>, | |
| ): QueueController<E, H & Record<T, S extends z.ZodType ? z.infer<S> : void>> { | |
| if (maybeHandler) { | |
| this._schemas.set(type, schemaOrHandler as S); | |
| this._handlers.set(type, maybeHandler); | |
| return this; | |
| } | |
| this._handlers.set( | |
| type, | |
| schemaOrHandler as EmptyQueueHandler<E> | PayloadQueueHandler<E, unknown>, | |
| ); | |
| return this; | |
| } | |
| get process(): ExportedHandlerQueueHandler<E, QueueBody> { | |
| return async (batch, env, ctx) => { | |
| for (const message of batch.messages) { | |
| const { type, payload } = message.body; | |
| const handler = this._handlers.get(type); | |
| if (!handler) { | |
| console.error(`No handler found for message type: ${type}`); | |
| message.retry({ delaySeconds: 30 }); | |
| continue; | |
| } | |
| try { | |
| const schema = this._schemas.get(type); | |
| const validatedPayload = schema ? schema.parse(payload) : payload; | |
| await handler({ | |
| env, | |
| executionCtx: ctx, | |
| message, | |
| payload: validatedPayload, | |
| }); | |
| message.ack(); | |
| } catch (error) { | |
| console.error(`Error processing message type ${type}:`, error); | |
| message.retry({ delaySeconds: 30 }); | |
| } | |
| } | |
| }; | |
| } | |
| get send(): SendMethods<H> { | |
| return new Proxy({} as SendMethods<H>, { | |
| get: (_, type) => async (payload?: unknown, options?: QueueSendOptions) => { | |
| const binding = env[this._binding as keyof Env] as unknown as Queue; | |
| await binding.send({ type, payload }, options); | |
| }, | |
| }); | |
| } | |
| get sendBatch(): SendBatchMethods<H> { | |
| return new Proxy({} as SendBatchMethods<H>, { | |
| get: (_, type) => async (messages: Array<BatchMessageItem<unknown>>) => { | |
| const binding = env[this._binding as keyof Env] as unknown as Queue; | |
| await binding.sendBatch( | |
| messages.map((msg) => ({ | |
| body: { | |
| type, | |
| payload: 'payload' in msg ? msg.payload : undefined, | |
| }, | |
| ...(msg.options || {}), | |
| })), | |
| ); | |
| }, | |
| }); | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment