Skip to content

Instantly share code, notes, and snippets.

@kandros
Last active March 25, 2025 19:10
Show Gist options
  • Save kandros/92590ce3d557dd6fbabe7f2339be87f1 to your computer and use it in GitHub Desktop.
Save kandros/92590ce3d557dd6fbabe7f2339be87f1 to your computer and use it in GitHub Desktop.
// lib/server/sse/event-bus.ts
import { EventEmitter } from "events";
import { logger } from "~/lib/logger/logger";
interface SessionEvent {
type: string;
data: Object;
timestamp: string;
}
class SessionEventBus {
private static instance: SessionEventBus;
private emitter: EventEmitter;
private constructor() {
this.emitter = new EventEmitter();
this.emitter.setMaxListeners(0);
}
public static getInstance(): SessionEventBus {
if (!SessionEventBus.instance) {
SessionEventBus.instance = new SessionEventBus();
}
return SessionEventBus.instance;
}
public subscribe(sessionId: string, callback: (event: SessionEvent) => void) {
logger.debug("Subscribing to session events", { sessionId });
this.emitter.on(sessionId, callback);
}
public unsubscribe(sessionId: string, callback: (event: SessionEvent) => void) {
logger.debug("Unsubscribing from session events", { sessionId });
this.emitter.off(sessionId, callback);
}
public publish(sessionId: string, type: string, data: Object) {
const event: SessionEvent = {
type,
data,
timestamp: new Date().toISOString(),
};
logger.debug("Publishing event to session", {
sessionId,
type,
data,
});
this.emitter.emit(sessionId, event);
logger.info("Published event to session", { sessionId, type, data });
}
}
export const sessionEventBus = SessionEventBus.getInstance();
// app/routes/api/sse/index.ts
import { createAPIFileRoute } from "@tanstack/react-start/api";
import { getEvent } from "@tanstack/react-start/server";
import { getSessionId } from "~/lib/server/http-utils";
import { sessionEventBus } from "~/lib/server/sse/event-bus";
import { logger } from "~/lib/logger/logger";
export const APIRoute = createAPIFileRoute("/api/sse")({
GET: async ({ request, params }) => {
const responseHeaders = {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
};
const sessionId = await getSessionId();
if (!sessionId) {
return new Response("Unauthorized", { status: 401 });
}
const event = getEvent();
const stream = new ReadableStream({
start(controller) {
const encoder = new TextEncoder();
const send = (message: string) => {
controller.enqueue(encoder.encode(`data: ${message}\n\n`));
};
// Send initial connection confirmation
sessionEventBus.publish(sessionId, "connected", { sessionId });
// Subscribe to session-specific events
const handleSessionEvent = (eventData: any) => {
send(JSON.stringify(eventData));
};
sessionEventBus.subscribe(sessionId, handleSessionEvent);
// Handle client disconnection
event.node.req.on("close", () => {
logger.info("Client disconnected", { sessionId });
sessionEventBus.unsubscribe(sessionId, handleSessionEvent);
});
// Optional: Send periodic heartbeat to keep connection alive
const heartbeatInterval = setInterval(() => {
sessionEventBus.publish(sessionId, "heartbeat", {
timestamp: new Date().toISOString(),
sessionId,
});
}, 1000);
event.node.req.on("close", () => {
clearInterval(heartbeatInterval);
});
},
cancel() {
logger.info("Stream cancelled", { sessionId });
},
});
return new Response(stream, { headers: responseHeaders });
},
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment