Skip to content

Instantly share code, notes, and snippets.

@prescience-data
Last active November 25, 2022 01:47
Show Gist options
  • Save prescience-data/b7e7f0084b6628e4839236caf27f1e63 to your computer and use it in GitHub Desktop.
Save prescience-data/b7e7f0084b6628e4839236caf27f1e63 to your computer and use it in GitHub Desktop.
Simple Queue
/** Types **/
type JobId = string
type JobHandler<T> = () => Promise<T>
interface Job<T = any> {
id: JobId
handle: JobHandler<T>
locked: boolean
result?: any
}
/**
* Simple async queue.
*
* @public
*/
export class Queue extends EventEmitter {
/**
* All jobs passed to queue.
*
* @internal
*/
#jobs: Map<JobId, Job> = new Map()
/**
* Signals if queue is paused.
*
* @internal
*/
#isPaused: boolean = true
/**
* Adds a handler to the queue and awaits result.
*
* @param handle - Job handler body.
*
* @public
*/
public async process<T>(handle: JobHandler<T>): Promise<T> {
return new Promise<T>((resolve) => {
const id: string = randomUUID()
this.once(`result-${id}`, resolve)
this.add(id, handle)
})
}
/**
* Adds a handler to the queue and returns job id to await.
*
* @param id - Unique job id.
* @param handle - Job handler body.
*
* @public
*/
public add<T>(id: JobId, handle: JobHandler<T>): JobId {
this.#jobs.set(id, { id, handle, locked: false })
this.emit("add")
return id
}
/**
* Starts the queue.
*
* @public
*/
public start(): void {
this.resume()
this.#handleNextJob().catch((error) => this.emit("error", error))
}
/**
* Pauses processing.
*
* @public
*/
public pause(): void {
this.#isPaused = true
this.emit("pause")
}
/**
* Resumes processing.
*
* @public
*/
public resume(): void {
this.#isPaused = false
this.emit("resume")
}
/**
* Returns an array of any jobs not locked by another worker.
*
* @internal
*/
#unlockedJobs(): Job[] {
return [...this.#jobs.values()].filter(({ locked }) => !locked)
}
/**
* Loops until no jobs remain, then awaits a new job add signal.
*
* @internal
*/
async #handleNextJob(): Promise<void> {
// Wait for new job if none in stack.
if (!this.#unlockedJobs().length) {
await new Promise<void>((resolve) => this.once("add", resolve))
}
// Extract next job.
const [job] = this.#unlockedJobs()
job.locked = true
// If paused, wait until resumed.
if (this.#isPaused) {
await new Promise<void>((resolve) => this.once("resume", resolve))
}
job.result = await job.handle()
this.emit(`result`, { id: job.id, result: job.result })
this.emit(`result-${job.id}`, job.result)
return this.#handleNextJob()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment