Skip to content

Instantly share code, notes, and snippets.

@maanimis
Created February 15, 2025 18:48
Show Gist options
  • Save maanimis/95f4ac49620e319be1cffc909d3e3acb to your computer and use it in GitHub Desktop.
Save maanimis/95f4ac49620e319be1cffc909d3e3acb to your computer and use it in GitHub Desktop.
SSE-cloudflare-worker.js
addEventListener("fetch", (event) => {
event.respondWith(handleRequest(event.request));
});
async function handleRequest(request) {
const { readable, writable } = new TransformStream();
const headers = new Headers({
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Headers":
"Origin, X-Requested-With, Content-Type, Accept",
});
const sseStream = SSEStreamFactory.create(writable);
sseStream.start();
return new Response(readable, { status: 200, statusText: "OK", headers });
}
class SSEStream {
constructor(writable) {
this.writer = writable.getWriter();
this.encoder = new TextEncoder();
this.count = 0;
this.intervalId = null;
this.subscribers = new Map();
this.cleanupOnClose(writable);
}
start() {
this.notify("userConnected", "Hello to SSE message");
this.intervalId = setInterval(
() => this.notify("userMessage", `Repeat message: ${++this.count}`),
5000
);
}
async notify(eventType, message) {
const payload = JSON.stringify({
status: true,
text: message,
time: new Date().toISOString(),
});
await this.writeToStream(`id: id-${this.count}\n`);
await this.writeToStream(`event: ${eventType}\n`);
await this.writeToStream(`data: ${payload}\n\n`);
this.subscribers.get(eventType)?.forEach((callback) => callback(payload));
}
async writeToStream(data) {
try {
await this.writer.write(this.encoder.encode(data));
} catch (err) {
console.error("Error writing to stream:", err);
}
}
subscribe(eventType, callback) {
if (!this.subscribers.has(eventType)) {
this.subscribers.set(eventType, []);
}
this.subscribers.get(eventType).push(callback);
}
cleanupOnClose(writable) {
writable.closed
.then(() => this.cleanup())
.catch((err) => {
console.error("Stream closed with error:", err);
this.cleanup();
});
}
cleanup() {
clearInterval(this.intervalId);
this.writer.releaseLock();
}
}
class SSEStreamFactory {
static create(writable) {
return new SSEStream(writable);
}
}
// const sseStream = SSEStreamFactory.create(writable);
// sseStream.subscribe("customEvent", (data) =>
// console.log("Custom Event Received:", data)
// );
// sseStream.notify("customEvent", "This is a custom message");
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment