Last active
November 25, 2022 01:47
-
-
Save prescience-data/b7e7f0084b6628e4839236caf27f1e63 to your computer and use it in GitHub Desktop.
Simple Queue
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** Types **/ | |
type JobId = string | |
type JobHandler<T> = () => Promise<T> | |
interface Job<T = any> { | |
id: JobId | |
handle: JobHandler<T> | |
locked: boolean | |
result?: any | |
} | |
/** | |
* Simple async queue. | |
* | |
* @public | |
*/ | |
export class Queue extends EventEmitter { | |
/** | |
* All jobs passed to queue. | |
* | |
* @internal | |
*/ | |
#jobs: Map<JobId, Job> = new Map() | |
/** | |
* Signals if queue is paused. | |
* | |
* @internal | |
*/ | |
#isPaused: boolean = true | |
/** | |
* Adds a handler to the queue and awaits result. | |
* | |
* @param handle - Job handler body. | |
* | |
* @public | |
*/ | |
public async process<T>(handle: JobHandler<T>): Promise<T> { | |
return new Promise<T>((resolve) => { | |
const id: string = randomUUID() | |
this.once(`result-${id}`, resolve) | |
this.add(id, handle) | |
}) | |
} | |
/** | |
* Adds a handler to the queue and returns job id to await. | |
* | |
* @param id - Unique job id. | |
* @param handle - Job handler body. | |
* | |
* @public | |
*/ | |
public add<T>(id: JobId, handle: JobHandler<T>): JobId { | |
this.#jobs.set(id, { id, handle, locked: false }) | |
this.emit("add") | |
return id | |
} | |
/** | |
* Starts the queue. | |
* | |
* @public | |
*/ | |
public start(): void { | |
this.resume() | |
this.#handleNextJob().catch((error) => this.emit("error", error)) | |
} | |
/** | |
* Pauses processing. | |
* | |
* @public | |
*/ | |
public pause(): void { | |
this.#isPaused = true | |
this.emit("pause") | |
} | |
/** | |
* Resumes processing. | |
* | |
* @public | |
*/ | |
public resume(): void { | |
this.#isPaused = false | |
this.emit("resume") | |
} | |
/** | |
* Returns an array of any jobs not locked by another worker. | |
* | |
* @internal | |
*/ | |
#unlockedJobs(): Job[] { | |
return [...this.#jobs.values()].filter(({ locked }) => !locked) | |
} | |
/** | |
* Loops until no jobs remain, then awaits a new job add signal. | |
* | |
* @internal | |
*/ | |
async #handleNextJob(): Promise<void> { | |
// Wait for new job if none in stack. | |
if (!this.#unlockedJobs().length) { | |
await new Promise<void>((resolve) => this.once("add", resolve)) | |
} | |
// Extract next job. | |
const [job] = this.#unlockedJobs() | |
job.locked = true | |
// If paused, wait until resumed. | |
if (this.#isPaused) { | |
await new Promise<void>((resolve) => this.once("resume", resolve)) | |
} | |
job.result = await job.handle() | |
this.emit(`result`, { id: job.id, result: job.result }) | |
this.emit(`result-${job.id}`, job.result) | |
return this.#handleNextJob() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment