Last active
February 28, 2022 00:08
-
-
Save Hashbrown777/4a85109c91ee577939e9447414fba9b2 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
type DropFirst<T extends unknown[]> = T extends [any, ...infer U] ? U : never; | |
type ResolveQueue <T> = (output :T[]) => void; | |
type RejectQueue = (error :any) => void; | |
type InitQueueParams<T> = [ResolveQueue<T>, RejectQueue]; | |
type InitQueue <T> = (...args :InitQueueParams<T>) => any; | |
type Enqueued <T> = () => Promise<T>; | |
export class Queue<T> extends Promise<T[]> { | |
private max :number; | |
private waiting :Array<Enqueued<T>>; | |
private current :number; | |
private index :number; | |
private output :T[]|null; | |
private starting :boolean; | |
private resolve !:null|ResolveQueue<T>; | |
private reject !:null|RejectQueue; | |
protected init :null|InitQueue<T> = (resolve, reject) => { | |
this.resolve = resolve; | |
this.reject = reject; | |
this.init = null; | |
return this; | |
}; | |
//bug in TS static function cant resolve type of protected function | |
/*protected*/ constructor( | |
callback :InitQueue<T>, | |
maxConcurrent :number | |
) { | |
super(callback); | |
this.max = maxConcurrent | |
this.waiting = []; | |
this.current = 0; | |
this.index = -1; | |
this.output = null; | |
this.starting = false; | |
} | |
static async create<T>(...args :DropFirst<ConstructorParameters<typeof Queue>>) :Promise<[Queue<T>]> { | |
let output :Queue<T>; | |
const initArgs = await new Promise<InitQueueParams<T>>((resolve) => { | |
output = new Queue<T>( | |
(...initArgs) => { resolve(initArgs); }, | |
...args | |
); | |
}); | |
//return an array because JS is a dick and unrolls all nested promises | |
//INCLUDING the one we're deliberately trying to return AS A PROMISE (Queue extends Promise) | |
return [output!.init!(...initArgs)]; | |
} | |
private async pop(index :number, next :Enqueued<T>) { | |
try { | |
this.output![index] = await next(); | |
--this.current; | |
this.start(); | |
} | |
catch (error) { | |
if (this.reject) { | |
this.resolve = null; | |
this.reject(error); | |
} | |
else { | |
--this.current; | |
this.start(); | |
} | |
} | |
} | |
start() :this { | |
if (this.starting) | |
return this; | |
this.starting = true; | |
if (this.output == null) | |
this.output = new Array(this.waiting.length); | |
while (this.current < this.max && this.waiting.length) { | |
++this.current; | |
this.pop(++this.index, this.waiting.shift()!); | |
} | |
if ( | |
(this.push == null && this.resolve) && | |
!(this.waiting.length || this.current) | |
) { | |
this.resolve(this.output); | |
this.output = null; | |
} | |
else | |
this.starting = false; | |
return this; | |
} | |
push :null|((next :Enqueued<T>) => number) = (next :Enqueued<T>) => { | |
const index = this.index + this.waiting.push(next); | |
if (this.output != null) { | |
this.output.push(undefined as any); | |
this.start(); | |
} | |
return index; | |
} | |
end() :this { | |
this.push = null; | |
this.start(); | |
return this; | |
} | |
} |
Author
Hashbrown777
commented
May 18, 2021
•
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment