Created
November 26, 2019 09:43
-
-
Save royra/988d541c84a5509538576c460092fd2e to your computer and use it in GitHub Desktop.
Convert an "old" JS stream to AsyncIterable
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// Convert an "old" stream to AsyncIterable. | |
// | |
// Stream are "push" interfaces and iterators are "pull" interfaces. So there needs | |
// to be some kind of queue to accumulate the pushed data while waiting for "pulls", | |
// i.e, calls to next(). In this implementation the queue is the pendingValues array. | |
// | |
// Note: Not needed for current implementations of NodeJS Readable stream, as they are | |
// already AsyncIterable. | |
// | |
interface OldReadable { | |
on(event: "close", listener: () => void): this; | |
on(event: "data", listener: (chunk: any) => void): this; | |
on(event: "end", listener: () => void): this; | |
on(event: "error", listener: (err: Error) => void): this; | |
} | |
type PromiseResolver<T> = { | |
resolve: (result: T) => void | |
reject: (err: Error) => void | |
} | |
export const asAsyncIterable = <T>(stream: OldReadable): AsyncIterable<T> => { | |
let done = false | |
let error: Error | |
let pendingNextPromise: PromiseResolver<IteratorResult<T>> | null = null | |
const pendingValues: T[] = [] | |
const onEnd = () => { | |
done = true | |
if (pendingNextPromise) { | |
pendingNextPromise.resolve({ done: true, value: undefined }) | |
pendingNextPromise = null | |
} | |
} | |
stream.on('end', onEnd) | |
stream.on('close', onEnd) | |
stream.on('error', err => { | |
error = err | |
if (pendingNextPromise) { | |
pendingNextPromise.reject(err) | |
pendingNextPromise = null | |
} | |
}) | |
stream.on('data', (value: T) => { | |
if (pendingNextPromise) { | |
pendingNextPromise.resolve({ done: false, value }) | |
pendingNextPromise = null | |
} else { | |
pendingValues.push(value) | |
} | |
}) | |
return { | |
[Symbol.asyncIterator](): AsyncIterator<T> { | |
return { | |
next(): Promise<IteratorResult<T>> { | |
// return pending values before errors | |
if (pendingValues.length > 0) { // can't replace length check with shift and a test for | |
// undefined, since T itself might be the type undefined | |
const value = pendingValues.shift() as T | |
return Promise.resolve({ done: false, value }) | |
} | |
// return errors before "end" | |
if (error !== undefined) { | |
return Promise.reject(error) | |
} | |
if (done) { | |
return Promise.resolve({ done: true, value: undefined }) | |
} | |
// no available value - set pending promise | |
return new Promise((resolve, reject) => pendingNextPromise = { resolve, reject }) | |
}, | |
} | |
}, | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment