Created
March 1, 2025 22:48
-
-
Save qpwo/84981af999517b305134731cf7bf6e2e to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** Run different types of jobs via postgres queue. | |
* Aims for exactly-once completion of (job, key) pair. | |
*/ | |
import 'dotenv/config' | |
import { Pool, PoolClient } from 'pg' | |
import { JobStatus } from 'shared' | |
type StatusObj<TData = any, TResult = any> = { | |
jobId: string | |
jobType: string | |
key: string | |
inputData: TData | |
status: JobStatus | |
createdAt: Date | |
updatedAt: Date | |
startedAt?: Date | |
completedAt?: Date | |
errorMessage?: string | |
result?: TResult | |
} | |
type JobHandler<Pair extends [any, any]> = (params: { jobId: string; key: string; data: Pair[0] }) => Promise<Pair[1]> | |
export class MyJobQueue<JobTypes extends string, Jobs extends Record<JobTypes, [any, any]>> { | |
private pool = new Pool({}) | |
async createOrUpdateSchema(): Promise<void> { | |
const client = await this.pool.connect() | |
try { | |
await client.query('BEGIN') | |
await client.query(` | |
DO $$ BEGIN | |
IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'job_status') THEN | |
CREATE TYPE job_status AS ENUM ('waiting', 'active', 'completed', 'failed', 'cancelled', 'expiredFailed', 'expiredCompleted'); | |
END IF; | |
END $$; | |
CREATE TABLE IF NOT EXISTS job_types_t ( | |
"jobType" TEXT PRIMARY KEY, | |
"maxWaitingMs" INTEGER NOT NULL, | |
"maxActiveMs" INTEGER NOT NULL, | |
"failValidForMs" INTEGER NOT NULL, | |
"successValidForMs" INTEGER NOT NULL, | |
"maxParallel" INTEGER NOT NULL DEFAULT 0, | |
"createdAt" TIMESTAMPTZ NOT NULL DEFAULT NOW(), | |
"updatedAt" TIMESTAMPTZ NOT NULL DEFAULT NOW() | |
); | |
CREATE TABLE IF NOT EXISTS jobs_t ( | |
"jobId" UUID PRIMARY KEY DEFAULT gen_random_uuid(), | |
"jobType" TEXT NOT NULL REFERENCES job_types_t("jobType"), | |
"key" TEXT NOT NULL, | |
"data" JSONB NOT NULL DEFAULT '{}', | |
"status" job_status NOT NULL DEFAULT 'waiting', | |
"createdAt" TIMESTAMPTZ NOT NULL DEFAULT NOW(), | |
"updatedAt" TIMESTAMPTZ NOT NULL DEFAULT NOW(), | |
"startedAt" TIMESTAMPTZ, | |
"completedAt" TIMESTAMPTZ, | |
"errorMessage" TEXT, | |
"result" JSONB | |
); | |
CREATE INDEX IF NOT EXISTS idx_jobs_status ON jobs_t("status", "jobType", "createdAt"); | |
CREATE INDEX IF NOT EXISTS idx_jobs_job_type ON jobs_t("jobType", "status", "createdAt"); | |
DO $$ BEGIN | |
IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_jobs_deduplication') THEN | |
CREATE UNIQUE INDEX idx_jobs_deduplication ON jobs_t("jobType", "key") WHERE "status" IN ('waiting', 'active'); | |
END IF; | |
END $$; | |
`) | |
await client.query('COMMIT') | |
} catch (err) { | |
await client.query('ROLLBACK') | |
throw err | |
} finally { | |
client.release() | |
} | |
} | |
async createOrUpdateJobType({ | |
jobType, | |
maxWaitingMs, | |
maxActiveMs, | |
failValidForMs, | |
successValidForMs, | |
maxParallel = 0, | |
}: { | |
jobType: string | |
maxWaitingMs: number | |
maxActiveMs: number | |
failValidForMs: number | |
successValidForMs: number | |
maxParallel?: number | |
}): Promise<void> { | |
if ( | |
!jobType || | |
typeof maxWaitingMs !== 'number' || | |
typeof maxActiveMs !== 'number' || | |
typeof failValidForMs !== 'number' || | |
typeof successValidForMs !== 'number' | |
) { | |
throw new Error('Invalid job type parameters') | |
} | |
await this.pool.query( | |
`INSERT INTO job_types_t ("jobType", "maxWaitingMs", "maxActiveMs", "failValidForMs", "successValidForMs", "maxParallel", "updatedAt") | |
VALUES ($1, $2, $3, $4, $5, $6, NOW()) | |
ON CONFLICT ("jobType") DO UPDATE SET | |
"maxWaitingMs" = $2, "maxActiveMs" = $3, "failValidForMs" = $4, | |
"successValidForMs" = $5, "maxParallel" = $6, "updatedAt" = NOW()`, | |
[jobType, maxWaitingMs, maxActiveMs, failValidForMs, successValidForMs, maxParallel], | |
) | |
} | |
async send<K extends JobTypes>({ jobType, key, data }: { jobType: K; key: string; data: Jobs[K][0] }): Promise<string> { | |
if (!jobType || !key) throw new Error('Job type and key are required') | |
const client = await this.pool.connect() | |
try { | |
await client.query('BEGIN') | |
const jobTypeResult = await client.query('SELECT "successValidForMs" FROM job_types_t WHERE "jobType" = $1', [jobType]) | |
if (jobTypeResult.rowCount === 0) throw new Error(`Job type "${jobType}" is not defined`) | |
const { successValidForMs } = jobTypeResult.rows[0] | |
// First check for a recently completed successful job | |
const recentSuccessResult = await client.query( | |
`SELECT "jobId" FROM jobs_t | |
WHERE "jobType" = $1 AND "key" = $2 AND "status" = 'completed' | |
AND "completedAt" > NOW() - make_interval(0, 0, 0, 0, 0, 0, $3 / 1000.0) | |
ORDER BY "completedAt" DESC LIMIT 1`, | |
[jobType, key, successValidForMs], | |
) | |
if ((recentSuccessResult.rowCount ?? 0) > 0) { | |
await client.query('COMMIT') | |
return recentSuccessResult.rows[0].jobId | |
} | |
// Then check for an existing waiting or active job | |
const existingJobResult = await client.query( | |
`SELECT "jobId" FROM jobs_t WHERE "jobType" = $1 AND "key" = $2 AND "status" IN ('waiting', 'active') LIMIT 1`, | |
[jobType, key], | |
) | |
if ((existingJobResult.rowCount ?? 0) > 0) { | |
await client.query('COMMIT') | |
return existingJobResult.rows[0].jobId | |
} | |
const insertResult = await client.query(`INSERT INTO jobs_t ("jobType", "key", "data") VALUES ($1, $2, $3) RETURNING "jobId"`, [ | |
jobType, | |
key, | |
JSON.stringify(data), | |
]) | |
await client.query(`SELECT pg_notify('new_job', $1)`, [jobType]) | |
await client.query('COMMIT') | |
return insertResult.rows[0].jobId | |
} catch (err) { | |
await client.query('ROLLBACK') | |
throw err | |
} finally { | |
client.release() | |
} | |
} | |
async getStatus<K extends JobTypes>({ jobId }: { jobId: string }): Promise<StatusObj<Jobs[K][0], Jobs[K][1]>> { | |
const result = await this.pool.query( | |
`SELECT "jobId", "jobType", "key", "data" as "inputData", "status", "createdAt", "updatedAt", | |
"startedAt", "completedAt", "errorMessage", "result" | |
FROM jobs_t WHERE "jobId" = $1`, | |
[jobId], | |
) | |
if (result.rowCount === 0) throw new Error(`Job with ID "${jobId}" not found`) | |
return result.rows[0] | |
} | |
async waitUntilDone<K extends JobTypes>({ | |
jobId, | |
timeoutMs, | |
}: { | |
jobId: string | |
timeoutMs: number | |
}): Promise<StatusObj<Jobs[K][0], Jobs[K][1]>> { | |
let released = false | |
const client = await this.pool.connect() | |
try { | |
let statusObj = await this.getStatus<K>({ jobId }) | |
if (['completed', 'failed', 'cancelled', 'expiredFailed', 'expiredCompleted'].includes(statusObj.status)) { | |
client.release() | |
return statusObj | |
} | |
await client.query('LISTEN job_updates') | |
const startTime = Date.now() | |
const pollIntervalMs = 1000 | |
return new Promise((resolve, reject) => { | |
const checkJobStatus = async () => { | |
try { | |
statusObj = await this.getStatus<K>({ jobId }) | |
if (['completed', 'failed', 'cancelled', 'expiredFailed', 'expiredCompleted'].includes(statusObj.status)) { | |
cleanup() | |
resolve(statusObj) | |
return | |
} | |
if (Date.now() - startTime > timeoutMs) { | |
cleanup() | |
reject(new Error(`Wait timeout after ${timeoutMs}ms`)) | |
return | |
} | |
setTimeout(checkJobStatus, pollIntervalMs) | |
} catch (err) { | |
cleanup() | |
reject(err) | |
} | |
} | |
const onNotification = async (notification: any) => { | |
if (notification.payload === jobId) { | |
try { | |
statusObj = await this.getStatus<K>({ jobId }) | |
if (['completed', 'failed', 'cancelled', 'expiredFailed', 'expiredCompleted'].includes(statusObj.status)) { | |
cleanup() | |
resolve(statusObj) | |
} | |
} catch (err) { | |
/* continue waiting */ | |
} | |
} | |
} | |
const cleanup = () => { | |
client.removeListener('notification', onNotification) | |
client.query('UNLISTEN job_updates').catch(() => {}) | |
if (!released) client.release() | |
released = true | |
} | |
client.on('notification', onNotification) | |
checkJobStatus() | |
}) | |
} catch (err) { | |
if (!released) client.release() | |
released = true | |
throw err | |
} | |
} | |
async cancel<K extends JobTypes>({ jobId }: { jobId: string }): Promise<StatusObj<Jobs[K][0], Jobs[K][1]>> { | |
const client = await this.pool.connect() | |
try { | |
await client.query('BEGIN') | |
const result = await client.query( | |
`UPDATE jobs_t | |
SET "status" = 'cancelled', "updatedAt" = NOW() | |
WHERE "jobId" = $1 AND "status" = 'waiting' | |
RETURNING "jobId", "jobType", "key", "data" as "inputData", "status", "createdAt", | |
"updatedAt", "startedAt", "completedAt", "errorMessage", "result"`, | |
[jobId], | |
) | |
if (result.rowCount === 0) { | |
const jobCheck = await client.query('SELECT "status" FROM jobs_t WHERE "jobId" = $1', [jobId]) | |
if (jobCheck.rowCount === 0) throw new Error(`Job with ID "${jobId}" not found`) | |
throw new Error(`Cannot cancel job with status "${jobCheck.rows[0].status}"`) | |
} | |
await client.query(`SELECT pg_notify('job_updates', $1)`, [jobId]) | |
await client.query('COMMIT') | |
return result.rows[0] | |
} catch (err) { | |
await client.query('ROLLBACK') | |
throw err | |
} finally { | |
client.release() | |
} | |
} | |
async listJobs<K extends JobTypes>({ | |
jobType, | |
status, | |
key, | |
limit = 100, | |
offset = 0, | |
}: { | |
jobType?: K | |
status?: JobStatus | |
key?: string | |
limit?: number | |
offset?: number | |
}): Promise<Array<StatusObj<Jobs[K][0], Jobs[K][1]>>> { | |
const params: any[] = [] | |
const conditions: string[] = [] | |
if (jobType) { | |
params.push(jobType) | |
conditions.push(`"jobType" = $${params.length}`) | |
} | |
if (status) { | |
params.push(status) | |
conditions.push(`"status" = $${params.length}`) | |
} | |
if (key) { | |
params.push(key) | |
conditions.push(`"key" = $${params.length}`) | |
} | |
const whereClause = conditions.length > 0 ? `WHERE ${conditions.join(' AND ')}` : '' | |
params.push(limit, offset) | |
const result = await this.pool.query( | |
`SELECT "jobId", "jobType", "key", "data" as "inputData", "status", "createdAt", "updatedAt", | |
"startedAt", "completedAt", "errorMessage", "result" | |
FROM jobs_t ${whereClause} | |
ORDER BY "createdAt" DESC | |
LIMIT $${params.length - 1} OFFSET $${params.length}`, | |
params, | |
) | |
return result.rows | |
} | |
/** | |
* Clears all jobs of a specific type from the queue | |
* @param jobType The job type to clear | |
* @param status Optional status filter to clear only specific job statuses | |
* @returns The number of jobs cleared | |
*/ | |
async clearJobs<K extends JobTypes>({ jobType, status }: { jobType: K; status?: JobStatus }): Promise<number> { | |
const client = await this.pool.connect() | |
try { | |
await client.query('BEGIN') | |
// Build the query with optional status filter | |
let query = `DELETE FROM jobs_t WHERE "jobType" = $1` | |
const params: any[] = [jobType] | |
if (status) { | |
query += ` AND "status" = $2` | |
params.push(status) | |
} | |
// Execute the delete | |
const result = await client.query(query, params) | |
await client.query('COMMIT') | |
return result.rowCount || 0 | |
} catch (err) { | |
await client.query('ROLLBACK') | |
throw err | |
} finally { | |
client.release() | |
} | |
} | |
/** | |
* Removes a job type and all of its jobs | |
* @param jobType The job type to remove | |
* @returns The number of jobs deleted | |
*/ | |
async removeJobType<K extends JobTypes>({ jobType }: { jobType: K }): Promise<number> { | |
const client = await this.pool.connect() | |
try { | |
await client.query('BEGIN') | |
// Delete all jobs of this type first | |
const deleteJobsResult = await client.query(`DELETE FROM jobs_t WHERE "jobType" = $1`, [jobType]) | |
const jobsDeleted = deleteJobsResult.rowCount || 0 | |
// Then delete the job type | |
await client.query(`DELETE FROM job_types_t WHERE "jobType" = $1`, [jobType]) | |
await client.query('COMMIT') | |
return jobsDeleted | |
} catch (err) { | |
await client.query('ROLLBACK') | |
throw err | |
} finally { | |
client.release() | |
} | |
} | |
async startWorkerWaitForever<K extends JobTypes>(jobType: K, handleJob: JobHandler<Jobs[K]>): Promise<never> { | |
if (!jobType || typeof handleJob !== 'function') throw new Error('Job type and handler function are required') | |
const client = await this.pool.connect() | |
try { | |
const jobTypeResult = await client.query('SELECT 1 FROM job_types_t WHERE "jobType" = $1', [jobType]) | |
if (jobTypeResult.rowCount === 0) throw new Error(`Job type "${jobType}" is not defined`) | |
await client.query('LISTEN new_job') | |
client.on('notification', async notification => { | |
if (notification.payload === jobType) await this.processNextJob(jobType, handleJob) | |
}) | |
this.processNextJob(jobType, handleJob) | |
return new Promise<never>(() => {}) | |
} catch (err) { | |
client.release() | |
throw err | |
} | |
} | |
private async processNextJob<K extends JobTypes>(jobType: K, handleJob: JobHandler<Jobs[K]>) { | |
let activeClient: PoolClient | null = null | |
try { | |
// Acquire a database client and start a transaction | |
activeClient = await this.pool.connect() | |
await activeClient.query('BEGIN') | |
// Lock the job_types_t row for this jobType to serialize access | |
const jobTypeResult = await activeClient.query( | |
`SELECT "maxActiveMs", "maxParallel" FROM job_types_t WHERE "jobType" = $1 FOR UPDATE`, | |
[jobType], | |
) | |
if (jobTypeResult.rowCount === 0) { | |
throw new Error(`Job type "${jobType}" not found`) | |
} | |
const { maxActiveMs, maxParallel } = jobTypeResult.rows[0] | |
// Determine if we can process a new job based on maxParallel | |
let canProcessJob = true | |
if (maxParallel > 0) { | |
const activeCountResult = await activeClient.query(`SELECT COUNT(*) FROM jobs_t WHERE "jobType" = $1 AND "status" = 'active'`, [ | |
jobType, | |
]) | |
const activeCount = parseInt(activeCountResult.rows[0].count) | |
if (activeCount >= maxParallel) { | |
canProcessJob = false | |
} | |
} | |
// If we can't process a job due to maxParallel limit, exit early | |
if (!canProcessJob) { | |
await activeClient.query('COMMIT') | |
activeClient.release() | |
activeClient = null | |
return | |
} | |
// Select a waiting job atomically | |
const jobResult = await activeClient.query( | |
`SELECT "jobId", "key", "data" FROM jobs_t | |
WHERE "jobType" = $1 AND "status" = 'waiting' | |
ORDER BY "createdAt" LIMIT 1 FOR UPDATE SKIP LOCKED`, | |
[jobType], | |
) | |
// If no waiting jobs are available, exit | |
if (jobResult.rowCount === 0) { | |
await activeClient.query('COMMIT') | |
activeClient.release() | |
activeClient = null | |
return | |
} | |
const { jobId, key, data } = jobResult.rows[0] | |
// Update the job to 'active' status | |
await activeClient.query(`UPDATE jobs_t SET "status" = 'active', "startedAt" = NOW(), "updatedAt" = NOW() WHERE "jobId" = $1`, [ | |
jobId, | |
]) | |
// Commit the transaction, releasing the lock | |
await activeClient.query('COMMIT') | |
activeClient.release() | |
activeClient = null | |
// Set up a timeout for the job | |
const jobTimeout = setTimeout(async () => { | |
try { | |
const timeoutClient = await this.pool.connect() | |
await timeoutClient.query( | |
`UPDATE jobs_t SET "status" = 'failed', "updatedAt" = NOW(), | |
"errorMessage" = 'Job timed out after ${maxActiveMs}ms' | |
WHERE "jobId" = $1 AND "status" = 'active'`, | |
[jobId], | |
) | |
await timeoutClient.query(`SELECT pg_notify('job_updates', $1)`, [jobId]) | |
timeoutClient.release() | |
} catch (err) { | |
console.error('Error handling job timeout:', err) | |
} | |
}, maxActiveMs) | |
// Process the job | |
try { | |
const result = await handleJob({ jobId, key, data }) | |
clearTimeout(jobTimeout) | |
const completeClient = await this.pool.connect() | |
await completeClient.query( | |
`UPDATE jobs_t SET "status" = 'completed', "completedAt" = NOW(), | |
"updatedAt" = NOW(), "result" = $2 WHERE "jobId" = $1 AND "status" = 'active'`, | |
[jobId, JSON.stringify(result)], | |
) | |
await completeClient.query(`SELECT pg_notify('job_updates', $1)`, [jobId]) | |
completeClient.release() | |
} catch (err) { | |
clearTimeout(jobTimeout) | |
const errorMessage = err instanceof Error ? err.message : String(err) | |
const failClient = await this.pool.connect() | |
await failClient.query( | |
`UPDATE jobs_t SET "status" = 'failed', "completedAt" = NOW(), | |
"updatedAt" = NOW(), "errorMessage" = $2 WHERE "jobId" = $1 AND "status" = 'active'`, | |
[jobId, errorMessage], | |
) | |
await failClient.query(`SELECT pg_notify('job_updates', $1)`, [jobId]) | |
failClient.release() | |
} | |
// Recursively process the next job | |
this.processNextJob(jobType, handleJob) | |
} catch (err) { | |
// Handle errors and rollback if necessary | |
if (activeClient) { | |
try { | |
await activeClient.query('ROLLBACK') | |
} catch (rollbackErr) { | |
console.error('Error rolling back transaction:', rollbackErr) | |
} | |
activeClient.release() | |
} | |
console.error('Error processing job:', err) | |
// Retry after a delay | |
setTimeout(() => this.processNextJob(jobType, handleJob), 1000) | |
} | |
} | |
async startMaintenanceProcess(): Promise<never> { | |
while (true) { | |
const client = await this.pool.connect() | |
try { | |
await client.query('BEGIN') | |
await client.query(` | |
UPDATE jobs_t SET "status" = 'failed', "updatedAt" = NOW(), "errorMessage" = 'Job expired in waiting status' | |
WHERE "status" = 'waiting' AND "createdAt" < NOW() - ( | |
SELECT make_interval(0, 0, 0, 0, 0, 0, "maxWaitingMs" / 1000.0) | |
FROM job_types_t WHERE "jobType" = jobs_t."jobType" | |
); | |
UPDATE jobs_t SET "status" = 'failed', "updatedAt" = NOW(), "errorMessage" = 'Job expired in active status' | |
WHERE "status" = 'active' AND "startedAt" < NOW() - ( | |
SELECT make_interval(0, 0, 0, 0, 0, 0, "maxActiveMs" / 1000.0) | |
FROM job_types_t WHERE "jobType" = jobs_t."jobType" | |
); | |
UPDATE jobs_t SET "status" = 'expiredFailed', "updatedAt" = NOW() | |
WHERE "status" = 'failed' AND "completedAt" < NOW() - ( | |
SELECT make_interval(0, 0, 0, 0, 0, 0, "failValidForMs" / 1000.0) | |
FROM job_types_t WHERE "jobType" = jobs_t."jobType" | |
); | |
UPDATE jobs_t SET "status" = 'expiredCompleted', "updatedAt" = NOW() | |
WHERE "status" = 'completed' AND "completedAt" < NOW() - ( | |
SELECT make_interval(0, 0, 0, 0, 0, 0, "successValidForMs" / 1000.0) | |
FROM job_types_t WHERE "jobType" = jobs_t."jobType" | |
); | |
`) | |
await client.query('COMMIT') | |
} catch (err) { | |
await client.query('ROLLBACK') | |
console.error('Error in maintenance process:', err) | |
} finally { | |
client.release() | |
} | |
await new Promise(resolve => setTimeout(resolve, 60000)) | |
} | |
} | |
} | |
export { type JobStatus, type StatusObj, type JobHandler } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment