Created
March 10, 2024 16:48
-
-
Save cowboyd/9350650be830bf4648f60ed8aadc5e5a to your computer and use it in GitHub Desktop.
Model a remote iterable using `EventSource` and Effection
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { main } from "npm:[email protected]"; | |
import { useEventStream } from "./use-event-stream.ts"; | |
// consume the server as an interable. | |
await main(function* () { | |
let source = yield* useEventStream<number, string>("http://localhost:8000"); | |
let next = yield* source.next(); | |
while (!next.done) { | |
console.log(next.value); | |
next = yield* source.next(); | |
} | |
console.log(next.value); | |
}); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { | |
createSignal, | |
once, | |
race, | |
resource, | |
spawn, | |
type Stream, | |
} from "npm:[email protected]"; | |
import { EventSource } from "https://deno.land/x/[email protected]/mod.ts"; | |
export function useEventStream<T, TDone>(url: string): Stream<T, TDone> { | |
return resource(function* (provide) { | |
let source = new EventSource(url); | |
let signal = createSignal<T, TDone>(); | |
// blow up if you get an error event. this will be automatically halted | |
// if the stream exists successfully. | |
yield* spawn(function* () { | |
throw yield* once(source, "error"); | |
}); | |
// send { done: false, value } to the stream | |
let onYield = function (event: Event) { | |
let data = JSON.parse((event as MessageEvent).data); | |
signal.send(data as T); | |
}; | |
// close the stream with { done: true, value } | |
let onReturn = function(event: Event) { | |
let data = JSON.parse((event as MessageEvent).data); | |
signal.close(data as TDone); | |
} | |
try { | |
source.addEventListener("yield", onYield); | |
source.addEventListener("return", onReturn); | |
// close out this resource when the computation finishes | |
// or we receive the "return" event | |
yield* race([once(source, "return"), provide(yield* signal)]); | |
} finally { | |
source.removeEventListener("yield", onYield); | |
source.removeEventListener("return", onReturn); | |
} | |
}); | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { ServerSentEventStream } from "jsr:@std/http"; | |
import { call, main, sleep, suspend, useScope } from "npm:[email protected]"; | |
await main(function* () { | |
const ac = new AbortController(); | |
let scope = yield* useScope(); | |
const server = Deno.serve({ | |
handler: (_req) => { | |
let sse = new ServerSentEventStream(); | |
scope.run(function* () { | |
let writer = sse.writable.getWriter(); | |
try { | |
for (let i = 5; i >= 1; i--) { | |
yield* call(() => writer.write({ event: "yield", data: JSON.stringify(i) })); | |
yield* sleep(1000); | |
} | |
yield* call(() => writer.write({ event: "return", data: JSON.stringify("blast off!") })); | |
} finally { | |
yield* call(() => writer.close()); | |
} | |
}); | |
return new Response(sse.readable, { | |
headers: { | |
"content-type": "text/event-stream", | |
"cache-control": "no-cache", | |
}, | |
}); | |
}, | |
signal: ac.signal, | |
}); | |
try { | |
yield* suspend(); | |
} finally { | |
ac.abort(); | |
yield* call(() => server.finished); | |
} | |
}); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment