Last active
March 25, 2025 19:10
-
-
Save kandros/92590ce3d557dd6fbabe7f2339be87f1 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// lib/server/sse/event-bus.ts | |
import { EventEmitter } from "events"; | |
import { logger } from "~/lib/logger/logger"; | |
interface SessionEvent { | |
type: string; | |
data: Object; | |
timestamp: string; | |
} | |
class SessionEventBus { | |
private static instance: SessionEventBus; | |
private emitter: EventEmitter; | |
private constructor() { | |
this.emitter = new EventEmitter(); | |
this.emitter.setMaxListeners(0); | |
} | |
public static getInstance(): SessionEventBus { | |
if (!SessionEventBus.instance) { | |
SessionEventBus.instance = new SessionEventBus(); | |
} | |
return SessionEventBus.instance; | |
} | |
public subscribe(sessionId: string, callback: (event: SessionEvent) => void) { | |
logger.debug("Subscribing to session events", { sessionId }); | |
this.emitter.on(sessionId, callback); | |
} | |
public unsubscribe(sessionId: string, callback: (event: SessionEvent) => void) { | |
logger.debug("Unsubscribing from session events", { sessionId }); | |
this.emitter.off(sessionId, callback); | |
} | |
public publish(sessionId: string, type: string, data: Object) { | |
const event: SessionEvent = { | |
type, | |
data, | |
timestamp: new Date().toISOString(), | |
}; | |
logger.debug("Publishing event to session", { | |
sessionId, | |
type, | |
data, | |
}); | |
this.emitter.emit(sessionId, event); | |
logger.info("Published event to session", { sessionId, type, data }); | |
} | |
} | |
export const sessionEventBus = SessionEventBus.getInstance(); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// app/routes/api/sse/index.ts | |
import { createAPIFileRoute } from "@tanstack/react-start/api"; | |
import { getEvent } from "@tanstack/react-start/server"; | |
import { getSessionId } from "~/lib/server/http-utils"; | |
import { sessionEventBus } from "~/lib/server/sse/event-bus"; | |
import { logger } from "~/lib/logger/logger"; | |
export const APIRoute = createAPIFileRoute("/api/sse")({ | |
GET: async ({ request, params }) => { | |
const responseHeaders = { | |
"Content-Type": "text/event-stream", | |
"Cache-Control": "no-cache", | |
Connection: "keep-alive", | |
}; | |
const sessionId = await getSessionId(); | |
if (!sessionId) { | |
return new Response("Unauthorized", { status: 401 }); | |
} | |
const event = getEvent(); | |
const stream = new ReadableStream({ | |
start(controller) { | |
const encoder = new TextEncoder(); | |
const send = (message: string) => { | |
controller.enqueue(encoder.encode(`data: ${message}\n\n`)); | |
}; | |
// Send initial connection confirmation | |
sessionEventBus.publish(sessionId, "connected", { sessionId }); | |
// Subscribe to session-specific events | |
const handleSessionEvent = (eventData: any) => { | |
send(JSON.stringify(eventData)); | |
}; | |
sessionEventBus.subscribe(sessionId, handleSessionEvent); | |
// Handle client disconnection | |
event.node.req.on("close", () => { | |
logger.info("Client disconnected", { sessionId }); | |
sessionEventBus.unsubscribe(sessionId, handleSessionEvent); | |
}); | |
// Optional: Send periodic heartbeat to keep connection alive | |
const heartbeatInterval = setInterval(() => { | |
sessionEventBus.publish(sessionId, "heartbeat", { | |
timestamp: new Date().toISOString(), | |
sessionId, | |
}); | |
}, 1000); | |
event.node.req.on("close", () => { | |
clearInterval(heartbeatInterval); | |
}); | |
}, | |
cancel() { | |
logger.info("Stream cancelled", { sessionId }); | |
}, | |
}); | |
return new Response(stream, { headers: responseHeaders }); | |
}, | |
}); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment