Last active
June 2, 2022 20:04
-
-
Save johnsonjo4531/932f50dde3ec1d719cb89dd6579ef84b to your computer and use it in GitHub Desktop.
Pingpong coroutine like library
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { pingpong } from "./pingpong"; | |
const sleep = (ms: number) => new Promise((res) => setTimeout(res, ms)); | |
(async () => { | |
const controller = pingpong<number, { done: true } | number>(async function ({ | |
receive, | |
send | |
}) { | |
let num = 0; | |
(async () => { | |
while (true) { | |
await sleep(1000); | |
send(++num); | |
} | |
})(); | |
(async () => { | |
while (true) { | |
const value = await receive(); | |
if (!!value && typeof value !== "number" && "done" in value) return; | |
num = value ?? 0; | |
send(num); | |
} | |
})(); | |
}); | |
console.clear(); | |
for await (const item of controller) { | |
if (!item) return; | |
if (item === 10) { | |
controller.send(100); | |
} else if (item > 110) { | |
return; | |
} | |
console.log(item); | |
} | |
console.log("done"); | |
console.log(await controller.result); | |
})(); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
type PingPongGenType<InternalSend, InternalReceive> = (controllers: { | |
receive: () => Promise<InternalReceive | undefined>; | |
receiveAll: () => AsyncGenerator< | |
Awaited<InternalReceive> | undefined, | |
void, | |
any | |
>; | |
return: () => Promise<void>; | |
throw: (err: any) => Promise<never>; | |
send: (output: InternalSend) => Promise<void>; | |
}) => Promise<void>; | |
async function* consumeStream<T>( | |
stream: ReadableStreamDefaultReader<T>, | |
onExit: { | |
throw: (err?: any) => Promise<never> | never; | |
return: () => Promise<void> | void; | |
} | |
) { | |
try { | |
let done, value; | |
do { | |
({ done, value } = await stream.read()); | |
if (done) continue; | |
yield value; | |
} while (!done); | |
} catch (err) { | |
return onExit.throw(err); | |
} finally { | |
return onExit.return(); | |
} | |
} | |
export function pingpong<InternalSend, InternalReceive>( | |
generator: PingPongGenType<InternalSend, InternalReceive> | |
) { | |
type ExternalSend = InternalReceive; | |
type ExternalReceive = InternalSend; | |
// type InternalReceive = ExternalSend; | |
const internalStream = new TransformStream<InternalSend, InternalSend>(); | |
const externalStream = new TransformStream<ExternalSend, ExternalSend>(); | |
const [externalStreamReader, externalStreamWriter] = [ | |
externalStream.readable.getReader(), | |
externalStream.writable.getWriter() | |
]; | |
const [internalStreamReader, internalStreamWriter] = [ | |
internalStream.readable.getReader(), | |
internalStream.writable.getWriter() | |
]; | |
async function cleanup() { | |
for await (const _ of consumeStream(internalStreamReader, { | |
async throw(err) { | |
throw err; | |
}, | |
async return() {} | |
})) { | |
} | |
for await (const _ of consumeStream(externalStreamReader, { | |
async throw(err) { | |
throw err; | |
}, | |
async return() {} | |
})) { | |
} | |
await Promise.allSettled([ | |
internalStreamWriter.releaseLock(), | |
externalStreamWriter.releaseLock() | |
]); | |
return Promise.allSettled([ | |
internalStreamReader.releaseLock(), | |
externalStreamReader.releaseLock() | |
]); | |
} | |
const result = generator({ | |
async receive() { | |
return (await externalStreamReader.read()).value; | |
}, | |
receiveAll: async function* receiveAll() {}, | |
async return() { | |
await cleanup(); | |
}, | |
async throw(err: any) { | |
await cleanup(); | |
throw err; | |
}, | |
async send(output) { | |
return internalStreamWriter.write(output); | |
} | |
}); | |
return { | |
async receive(): Promise<ExternalReceive> { | |
return (await internalStreamReader.read()).value; | |
}, | |
send(output: ExternalSend) { | |
return externalStreamWriter.write(output); | |
}, | |
receiveAll: async function* receiveAll() { | |
try { | |
let done, value; | |
do { | |
({ done, value } = await internalStreamReader.read()); | |
if (done) continue; | |
yield value; | |
} while (!done); | |
} catch (err) { | |
return this.throw(err); | |
} finally { | |
return this.return(); | |
} | |
}, | |
async return() { | |
await cleanup(); | |
}, | |
async throw(err: any) { | |
await cleanup(); | |
throw err; | |
}, | |
[Symbol.asyncIterator]() { | |
return this.receiveAll(); | |
}, | |
result | |
} as const; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
https://codesandbox.io/s/ping-pong-fj5d8r?file=/src/example-usage.ts