Last active
March 13, 2025 19:18
-
-
Save MattiasBuelens/e2f94a5819f04ec7d5d9a9d71739044f to your computer and use it in GitHub Desktop.
Teeing an async iterator
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Like ReadableStream.tee(), but for any async iterable. | |
* https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream/tee | |
* | |
* See: MattiasBuelens/web-streams-polyfill#80 | |
*/ | |
/// <reference lib="esnext" /> | |
interface QueueNode<T> { | |
value: T; | |
next: QueueNode<T> | undefined; | |
} | |
function teeAsyncIterator<T>(iterable: AsyncIterable<T>): [AsyncIterable<T>, AsyncIterable<T>] { | |
const source = iterable[Symbol.asyncIterator](); | |
// A linked list of enqueued chunks. (The first node has no value.) | |
let queue: QueueNode<T> = { | |
value: undefined!, | |
next: undefined | |
}; | |
// Which branches have already been closed. | |
let closed: [boolean, boolean] = [false, false]; | |
// Whether we're currently reading from the source. | |
let reading = false; | |
// Whether the source stream has closed. | |
let done = false; | |
// A promise for the current read (if reading is true). | |
let currentRead: Promise<void> | undefined; | |
async function next(): Promise<void> { | |
reading = true; | |
const result = await source.next(); | |
if (result.done) { | |
done = true; | |
} else { | |
const nextNode: QueueNode<T> = { | |
value: result.value, | |
next: undefined | |
} | |
queue.next = nextNode; | |
queue = nextNode; | |
} | |
reading = false; | |
} | |
async function* branch(i: 0 | 1, buffer: QueueNode<T>): AsyncGenerator<T, undefined, undefined> { | |
try { | |
while (true) { | |
if (buffer.next) { | |
buffer = buffer.next; | |
yield buffer.value; | |
} else if (done) { | |
return; | |
} else { | |
if (!reading) { | |
currentRead = next(); | |
} | |
await currentRead; | |
} | |
} | |
} finally { | |
closed[i] = true; | |
// Close source iterator if both branches are closed | |
// Important: don't call return() if next() returned {done: true}! | |
if (!done && closed[1 - i]) { | |
await source.return?.(); | |
} | |
} | |
} | |
return [ | |
branch(0, queue), | |
branch(1, queue) | |
]; | |
} |
There you go, now with a linked list. π
This version doesn't try to close the source iterator if both branches are garbage collected. But at least all buffered chunks will have been garbage collected as well.
I don't know which version has the best performance. My intuition says "linked lists are bad" and create more garbage than a regular linear array. But I have to admit, linked lists do make it easier to automatically throw away the unused buffered chunks.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
All right, I gave
WeakRef
s a try. It's far from perfect though: I only check whether the branches have been garbage collected after a successfulnext()
call, or after one of the branches is closed withreturn()
. There are probably some cases that I'm still missing... πI don't even know if it's a good idea to rely on these iterators actually getting garbage collected in order to close the source iterator... As MDN explains, they might live longer than expected, or possibly never get collected at all. π€·ββοΈ