Skip to content

Instantly share code, notes, and snippets.

@qpwo
Created March 1, 2025 22:48
Show Gist options
  • Save qpwo/84981af999517b305134731cf7bf6e2e to your computer and use it in GitHub Desktop.
Save qpwo/84981af999517b305134731cf7bf6e2e to your computer and use it in GitHub Desktop.
/** Run different types of jobs via postgres queue.
* Aims for exactly-once completion of (job, key) pair.
*/
import 'dotenv/config'
import { Pool, PoolClient } from 'pg'
import { JobStatus } from 'shared'
type StatusObj<TData = any, TResult = any> = {
jobId: string
jobType: string
key: string
inputData: TData
status: JobStatus
createdAt: Date
updatedAt: Date
startedAt?: Date
completedAt?: Date
errorMessage?: string
result?: TResult
}
type JobHandler<Pair extends [any, any]> = (params: { jobId: string; key: string; data: Pair[0] }) => Promise<Pair[1]>
export class MyJobQueue<JobTypes extends string, Jobs extends Record<JobTypes, [any, any]>> {
private pool = new Pool({})
async createOrUpdateSchema(): Promise<void> {
const client = await this.pool.connect()
try {
await client.query('BEGIN')
await client.query(`
DO $$ BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'job_status') THEN
CREATE TYPE job_status AS ENUM ('waiting', 'active', 'completed', 'failed', 'cancelled', 'expiredFailed', 'expiredCompleted');
END IF;
END $$;
CREATE TABLE IF NOT EXISTS job_types_t (
"jobType" TEXT PRIMARY KEY,
"maxWaitingMs" INTEGER NOT NULL,
"maxActiveMs" INTEGER NOT NULL,
"failValidForMs" INTEGER NOT NULL,
"successValidForMs" INTEGER NOT NULL,
"maxParallel" INTEGER NOT NULL DEFAULT 0,
"createdAt" TIMESTAMPTZ NOT NULL DEFAULT NOW(),
"updatedAt" TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS jobs_t (
"jobId" UUID PRIMARY KEY DEFAULT gen_random_uuid(),
"jobType" TEXT NOT NULL REFERENCES job_types_t("jobType"),
"key" TEXT NOT NULL,
"data" JSONB NOT NULL DEFAULT '{}',
"status" job_status NOT NULL DEFAULT 'waiting',
"createdAt" TIMESTAMPTZ NOT NULL DEFAULT NOW(),
"updatedAt" TIMESTAMPTZ NOT NULL DEFAULT NOW(),
"startedAt" TIMESTAMPTZ,
"completedAt" TIMESTAMPTZ,
"errorMessage" TEXT,
"result" JSONB
);
CREATE INDEX IF NOT EXISTS idx_jobs_status ON jobs_t("status", "jobType", "createdAt");
CREATE INDEX IF NOT EXISTS idx_jobs_job_type ON jobs_t("jobType", "status", "createdAt");
DO $$ BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_jobs_deduplication') THEN
CREATE UNIQUE INDEX idx_jobs_deduplication ON jobs_t("jobType", "key") WHERE "status" IN ('waiting', 'active');
END IF;
END $$;
`)
await client.query('COMMIT')
} catch (err) {
await client.query('ROLLBACK')
throw err
} finally {
client.release()
}
}
async createOrUpdateJobType({
jobType,
maxWaitingMs,
maxActiveMs,
failValidForMs,
successValidForMs,
maxParallel = 0,
}: {
jobType: string
maxWaitingMs: number
maxActiveMs: number
failValidForMs: number
successValidForMs: number
maxParallel?: number
}): Promise<void> {
if (
!jobType ||
typeof maxWaitingMs !== 'number' ||
typeof maxActiveMs !== 'number' ||
typeof failValidForMs !== 'number' ||
typeof successValidForMs !== 'number'
) {
throw new Error('Invalid job type parameters')
}
await this.pool.query(
`INSERT INTO job_types_t ("jobType", "maxWaitingMs", "maxActiveMs", "failValidForMs", "successValidForMs", "maxParallel", "updatedAt")
VALUES ($1, $2, $3, $4, $5, $6, NOW())
ON CONFLICT ("jobType") DO UPDATE SET
"maxWaitingMs" = $2, "maxActiveMs" = $3, "failValidForMs" = $4,
"successValidForMs" = $5, "maxParallel" = $6, "updatedAt" = NOW()`,
[jobType, maxWaitingMs, maxActiveMs, failValidForMs, successValidForMs, maxParallel],
)
}
async send<K extends JobTypes>({ jobType, key, data }: { jobType: K; key: string; data: Jobs[K][0] }): Promise<string> {
if (!jobType || !key) throw new Error('Job type and key are required')
const client = await this.pool.connect()
try {
await client.query('BEGIN')
const jobTypeResult = await client.query('SELECT "successValidForMs" FROM job_types_t WHERE "jobType" = $1', [jobType])
if (jobTypeResult.rowCount === 0) throw new Error(`Job type "${jobType}" is not defined`)
const { successValidForMs } = jobTypeResult.rows[0]
// First check for a recently completed successful job
const recentSuccessResult = await client.query(
`SELECT "jobId" FROM jobs_t
WHERE "jobType" = $1 AND "key" = $2 AND "status" = 'completed'
AND "completedAt" > NOW() - make_interval(0, 0, 0, 0, 0, 0, $3 / 1000.0)
ORDER BY "completedAt" DESC LIMIT 1`,
[jobType, key, successValidForMs],
)
if ((recentSuccessResult.rowCount ?? 0) > 0) {
await client.query('COMMIT')
return recentSuccessResult.rows[0].jobId
}
// Then check for an existing waiting or active job
const existingJobResult = await client.query(
`SELECT "jobId" FROM jobs_t WHERE "jobType" = $1 AND "key" = $2 AND "status" IN ('waiting', 'active') LIMIT 1`,
[jobType, key],
)
if ((existingJobResult.rowCount ?? 0) > 0) {
await client.query('COMMIT')
return existingJobResult.rows[0].jobId
}
const insertResult = await client.query(`INSERT INTO jobs_t ("jobType", "key", "data") VALUES ($1, $2, $3) RETURNING "jobId"`, [
jobType,
key,
JSON.stringify(data),
])
await client.query(`SELECT pg_notify('new_job', $1)`, [jobType])
await client.query('COMMIT')
return insertResult.rows[0].jobId
} catch (err) {
await client.query('ROLLBACK')
throw err
} finally {
client.release()
}
}
async getStatus<K extends JobTypes>({ jobId }: { jobId: string }): Promise<StatusObj<Jobs[K][0], Jobs[K][1]>> {
const result = await this.pool.query(
`SELECT "jobId", "jobType", "key", "data" as "inputData", "status", "createdAt", "updatedAt",
"startedAt", "completedAt", "errorMessage", "result"
FROM jobs_t WHERE "jobId" = $1`,
[jobId],
)
if (result.rowCount === 0) throw new Error(`Job with ID "${jobId}" not found`)
return result.rows[0]
}
async waitUntilDone<K extends JobTypes>({
jobId,
timeoutMs,
}: {
jobId: string
timeoutMs: number
}): Promise<StatusObj<Jobs[K][0], Jobs[K][1]>> {
let released = false
const client = await this.pool.connect()
try {
let statusObj = await this.getStatus<K>({ jobId })
if (['completed', 'failed', 'cancelled', 'expiredFailed', 'expiredCompleted'].includes(statusObj.status)) {
client.release()
return statusObj
}
await client.query('LISTEN job_updates')
const startTime = Date.now()
const pollIntervalMs = 1000
return new Promise((resolve, reject) => {
const checkJobStatus = async () => {
try {
statusObj = await this.getStatus<K>({ jobId })
if (['completed', 'failed', 'cancelled', 'expiredFailed', 'expiredCompleted'].includes(statusObj.status)) {
cleanup()
resolve(statusObj)
return
}
if (Date.now() - startTime > timeoutMs) {
cleanup()
reject(new Error(`Wait timeout after ${timeoutMs}ms`))
return
}
setTimeout(checkJobStatus, pollIntervalMs)
} catch (err) {
cleanup()
reject(err)
}
}
const onNotification = async (notification: any) => {
if (notification.payload === jobId) {
try {
statusObj = await this.getStatus<K>({ jobId })
if (['completed', 'failed', 'cancelled', 'expiredFailed', 'expiredCompleted'].includes(statusObj.status)) {
cleanup()
resolve(statusObj)
}
} catch (err) {
/* continue waiting */
}
}
}
const cleanup = () => {
client.removeListener('notification', onNotification)
client.query('UNLISTEN job_updates').catch(() => {})
if (!released) client.release()
released = true
}
client.on('notification', onNotification)
checkJobStatus()
})
} catch (err) {
if (!released) client.release()
released = true
throw err
}
}
async cancel<K extends JobTypes>({ jobId }: { jobId: string }): Promise<StatusObj<Jobs[K][0], Jobs[K][1]>> {
const client = await this.pool.connect()
try {
await client.query('BEGIN')
const result = await client.query(
`UPDATE jobs_t
SET "status" = 'cancelled', "updatedAt" = NOW()
WHERE "jobId" = $1 AND "status" = 'waiting'
RETURNING "jobId", "jobType", "key", "data" as "inputData", "status", "createdAt",
"updatedAt", "startedAt", "completedAt", "errorMessage", "result"`,
[jobId],
)
if (result.rowCount === 0) {
const jobCheck = await client.query('SELECT "status" FROM jobs_t WHERE "jobId" = $1', [jobId])
if (jobCheck.rowCount === 0) throw new Error(`Job with ID "${jobId}" not found`)
throw new Error(`Cannot cancel job with status "${jobCheck.rows[0].status}"`)
}
await client.query(`SELECT pg_notify('job_updates', $1)`, [jobId])
await client.query('COMMIT')
return result.rows[0]
} catch (err) {
await client.query('ROLLBACK')
throw err
} finally {
client.release()
}
}
async listJobs<K extends JobTypes>({
jobType,
status,
key,
limit = 100,
offset = 0,
}: {
jobType?: K
status?: JobStatus
key?: string
limit?: number
offset?: number
}): Promise<Array<StatusObj<Jobs[K][0], Jobs[K][1]>>> {
const params: any[] = []
const conditions: string[] = []
if (jobType) {
params.push(jobType)
conditions.push(`"jobType" = $${params.length}`)
}
if (status) {
params.push(status)
conditions.push(`"status" = $${params.length}`)
}
if (key) {
params.push(key)
conditions.push(`"key" = $${params.length}`)
}
const whereClause = conditions.length > 0 ? `WHERE ${conditions.join(' AND ')}` : ''
params.push(limit, offset)
const result = await this.pool.query(
`SELECT "jobId", "jobType", "key", "data" as "inputData", "status", "createdAt", "updatedAt",
"startedAt", "completedAt", "errorMessage", "result"
FROM jobs_t ${whereClause}
ORDER BY "createdAt" DESC
LIMIT $${params.length - 1} OFFSET $${params.length}`,
params,
)
return result.rows
}
/**
* Clears all jobs of a specific type from the queue
* @param jobType The job type to clear
* @param status Optional status filter to clear only specific job statuses
* @returns The number of jobs cleared
*/
async clearJobs<K extends JobTypes>({ jobType, status }: { jobType: K; status?: JobStatus }): Promise<number> {
const client = await this.pool.connect()
try {
await client.query('BEGIN')
// Build the query with optional status filter
let query = `DELETE FROM jobs_t WHERE "jobType" = $1`
const params: any[] = [jobType]
if (status) {
query += ` AND "status" = $2`
params.push(status)
}
// Execute the delete
const result = await client.query(query, params)
await client.query('COMMIT')
return result.rowCount || 0
} catch (err) {
await client.query('ROLLBACK')
throw err
} finally {
client.release()
}
}
/**
* Removes a job type and all of its jobs
* @param jobType The job type to remove
* @returns The number of jobs deleted
*/
async removeJobType<K extends JobTypes>({ jobType }: { jobType: K }): Promise<number> {
const client = await this.pool.connect()
try {
await client.query('BEGIN')
// Delete all jobs of this type first
const deleteJobsResult = await client.query(`DELETE FROM jobs_t WHERE "jobType" = $1`, [jobType])
const jobsDeleted = deleteJobsResult.rowCount || 0
// Then delete the job type
await client.query(`DELETE FROM job_types_t WHERE "jobType" = $1`, [jobType])
await client.query('COMMIT')
return jobsDeleted
} catch (err) {
await client.query('ROLLBACK')
throw err
} finally {
client.release()
}
}
async startWorkerWaitForever<K extends JobTypes>(jobType: K, handleJob: JobHandler<Jobs[K]>): Promise<never> {
if (!jobType || typeof handleJob !== 'function') throw new Error('Job type and handler function are required')
const client = await this.pool.connect()
try {
const jobTypeResult = await client.query('SELECT 1 FROM job_types_t WHERE "jobType" = $1', [jobType])
if (jobTypeResult.rowCount === 0) throw new Error(`Job type "${jobType}" is not defined`)
await client.query('LISTEN new_job')
client.on('notification', async notification => {
if (notification.payload === jobType) await this.processNextJob(jobType, handleJob)
})
this.processNextJob(jobType, handleJob)
return new Promise<never>(() => {})
} catch (err) {
client.release()
throw err
}
}
private async processNextJob<K extends JobTypes>(jobType: K, handleJob: JobHandler<Jobs[K]>) {
let activeClient: PoolClient | null = null
try {
// Acquire a database client and start a transaction
activeClient = await this.pool.connect()
await activeClient.query('BEGIN')
// Lock the job_types_t row for this jobType to serialize access
const jobTypeResult = await activeClient.query(
`SELECT "maxActiveMs", "maxParallel" FROM job_types_t WHERE "jobType" = $1 FOR UPDATE`,
[jobType],
)
if (jobTypeResult.rowCount === 0) {
throw new Error(`Job type "${jobType}" not found`)
}
const { maxActiveMs, maxParallel } = jobTypeResult.rows[0]
// Determine if we can process a new job based on maxParallel
let canProcessJob = true
if (maxParallel > 0) {
const activeCountResult = await activeClient.query(`SELECT COUNT(*) FROM jobs_t WHERE "jobType" = $1 AND "status" = 'active'`, [
jobType,
])
const activeCount = parseInt(activeCountResult.rows[0].count)
if (activeCount >= maxParallel) {
canProcessJob = false
}
}
// If we can't process a job due to maxParallel limit, exit early
if (!canProcessJob) {
await activeClient.query('COMMIT')
activeClient.release()
activeClient = null
return
}
// Select a waiting job atomically
const jobResult = await activeClient.query(
`SELECT "jobId", "key", "data" FROM jobs_t
WHERE "jobType" = $1 AND "status" = 'waiting'
ORDER BY "createdAt" LIMIT 1 FOR UPDATE SKIP LOCKED`,
[jobType],
)
// If no waiting jobs are available, exit
if (jobResult.rowCount === 0) {
await activeClient.query('COMMIT')
activeClient.release()
activeClient = null
return
}
const { jobId, key, data } = jobResult.rows[0]
// Update the job to 'active' status
await activeClient.query(`UPDATE jobs_t SET "status" = 'active', "startedAt" = NOW(), "updatedAt" = NOW() WHERE "jobId" = $1`, [
jobId,
])
// Commit the transaction, releasing the lock
await activeClient.query('COMMIT')
activeClient.release()
activeClient = null
// Set up a timeout for the job
const jobTimeout = setTimeout(async () => {
try {
const timeoutClient = await this.pool.connect()
await timeoutClient.query(
`UPDATE jobs_t SET "status" = 'failed', "updatedAt" = NOW(),
"errorMessage" = 'Job timed out after ${maxActiveMs}ms'
WHERE "jobId" = $1 AND "status" = 'active'`,
[jobId],
)
await timeoutClient.query(`SELECT pg_notify('job_updates', $1)`, [jobId])
timeoutClient.release()
} catch (err) {
console.error('Error handling job timeout:', err)
}
}, maxActiveMs)
// Process the job
try {
const result = await handleJob({ jobId, key, data })
clearTimeout(jobTimeout)
const completeClient = await this.pool.connect()
await completeClient.query(
`UPDATE jobs_t SET "status" = 'completed', "completedAt" = NOW(),
"updatedAt" = NOW(), "result" = $2 WHERE "jobId" = $1 AND "status" = 'active'`,
[jobId, JSON.stringify(result)],
)
await completeClient.query(`SELECT pg_notify('job_updates', $1)`, [jobId])
completeClient.release()
} catch (err) {
clearTimeout(jobTimeout)
const errorMessage = err instanceof Error ? err.message : String(err)
const failClient = await this.pool.connect()
await failClient.query(
`UPDATE jobs_t SET "status" = 'failed', "completedAt" = NOW(),
"updatedAt" = NOW(), "errorMessage" = $2 WHERE "jobId" = $1 AND "status" = 'active'`,
[jobId, errorMessage],
)
await failClient.query(`SELECT pg_notify('job_updates', $1)`, [jobId])
failClient.release()
}
// Recursively process the next job
this.processNextJob(jobType, handleJob)
} catch (err) {
// Handle errors and rollback if necessary
if (activeClient) {
try {
await activeClient.query('ROLLBACK')
} catch (rollbackErr) {
console.error('Error rolling back transaction:', rollbackErr)
}
activeClient.release()
}
console.error('Error processing job:', err)
// Retry after a delay
setTimeout(() => this.processNextJob(jobType, handleJob), 1000)
}
}
async startMaintenanceProcess(): Promise<never> {
while (true) {
const client = await this.pool.connect()
try {
await client.query('BEGIN')
await client.query(`
UPDATE jobs_t SET "status" = 'failed', "updatedAt" = NOW(), "errorMessage" = 'Job expired in waiting status'
WHERE "status" = 'waiting' AND "createdAt" < NOW() - (
SELECT make_interval(0, 0, 0, 0, 0, 0, "maxWaitingMs" / 1000.0)
FROM job_types_t WHERE "jobType" = jobs_t."jobType"
);
UPDATE jobs_t SET "status" = 'failed', "updatedAt" = NOW(), "errorMessage" = 'Job expired in active status'
WHERE "status" = 'active' AND "startedAt" < NOW() - (
SELECT make_interval(0, 0, 0, 0, 0, 0, "maxActiveMs" / 1000.0)
FROM job_types_t WHERE "jobType" = jobs_t."jobType"
);
UPDATE jobs_t SET "status" = 'expiredFailed', "updatedAt" = NOW()
WHERE "status" = 'failed' AND "completedAt" < NOW() - (
SELECT make_interval(0, 0, 0, 0, 0, 0, "failValidForMs" / 1000.0)
FROM job_types_t WHERE "jobType" = jobs_t."jobType"
);
UPDATE jobs_t SET "status" = 'expiredCompleted', "updatedAt" = NOW()
WHERE "status" = 'completed' AND "completedAt" < NOW() - (
SELECT make_interval(0, 0, 0, 0, 0, 0, "successValidForMs" / 1000.0)
FROM job_types_t WHERE "jobType" = jobs_t."jobType"
);
`)
await client.query('COMMIT')
} catch (err) {
await client.query('ROLLBACK')
console.error('Error in maintenance process:', err)
} finally {
client.release()
}
await new Promise(resolve => setTimeout(resolve, 60000))
}
}
}
export { type JobStatus, type StatusObj, type JobHandler }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment