-
-
Save MattiasBuelens/e2f94a5819f04ec7d5d9a9d71739044f to your computer and use it in GitHub Desktop.
/** | |
* Like ReadableStream.tee(), but for any async iterable. | |
* https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream/tee | |
* | |
* See: MattiasBuelens/web-streams-polyfill#80 | |
*/ | |
/// <reference lib="esnext" /> | |
interface QueueNode<T> { | |
value: T; | |
next: QueueNode<T> | undefined; | |
} | |
function teeAsyncIterator<T>(iterable: AsyncIterable<T>): [AsyncIterable<T>, AsyncIterable<T>] { | |
const source = iterable[Symbol.asyncIterator](); | |
// A linked list of enqueued chunks. (The first node has no value.) | |
let queue: QueueNode<T> = { | |
value: undefined!, | |
next: undefined | |
}; | |
// Which branches have already been closed. | |
let closed: [boolean, boolean] = [false, false]; | |
// Whether we're currently reading from the source. | |
let reading = false; | |
// Whether the source stream has closed. | |
let done = false; | |
// A promise for the current read (if reading is true). | |
let currentRead: Promise<void> | undefined; | |
async function next(): Promise<void> { | |
reading = true; | |
const result = await source.next(); | |
if (result.done) { | |
done = true; | |
} else { | |
const nextNode: QueueNode<T> = { | |
value: result.value, | |
next: undefined | |
} | |
queue.next = nextNode; | |
queue = nextNode; | |
} | |
reading = false; | |
} | |
async function* branch(i: 0 | 1, buffer: QueueNode<T>): AsyncGenerator<T, undefined, undefined> { | |
try { | |
while (true) { | |
if (buffer.next) { | |
buffer = buffer.next; | |
yield buffer.value; | |
} else if (done) { | |
return; | |
} else { | |
if (!reading) { | |
currentRead = next(); | |
} | |
await currentRead; | |
} | |
} | |
} finally { | |
closed[i] = true; | |
// Close source iterator if both branches are closed | |
// Important: don't call return() if next() returned {done: true}! | |
if (!done && closed[1 - i]) { | |
await source.return?.(); | |
} | |
} | |
} | |
return [ | |
branch(0, queue), | |
branch(1, queue) | |
]; | |
} |
All right, I gave WeakRef
s a try. It's far from perfect though: I only check whether the branches have been garbage collected after a successful next()
call, or after one of the branches is closed with return()
. There are probably some cases that I'm still missing... π
I don't even know if it's a good idea to rely on these iterators actually getting garbage collected in order to close the source iterator... As MDN explains, they might live longer than expected, or possibly never get collected at all. π€·ββοΈ
There you go, now with a linked list. π
This version doesn't try to close the source iterator if both branches are garbage collected. But at least all buffered chunks will have been garbage collected as well.
I don't know which version has the best performance. My intuition says "linked lists are bad" and create more garbage than a regular linear array. But I have to admit, linked lists do make it easier to automatically throw away the unused buffered chunks.
Sweet, not the kind of buffer branch i was hoping for. Where hoping for something in lines of what yocto-queue did with the the
Node#head
...Like we discussed in the issue, i have some concurrences about GC when one of the branch are forgotten and never used anymore. But i would totally use this anyway.
Looks very much like the sync iterator on SO with a few minor changes to make it async
If you come up with a object based solution (like the yocto queue) instead of using:
... or if you add a WeakRef too it and post it on that stackoverflow question as a answer then i will totally give you a bounty reward 4 it
(Object mode instead of
buffers
, slice, push === higher reward)