Skip to content

Instantly share code, notes, and snippets.

@tkh44
Created March 6, 2025 19:02
Show Gist options
  • Save tkh44/097414f26abb7637f69ee730499287ff to your computer and use it in GitHub Desktop.
Save tkh44/097414f26abb7637f69ee730499287ff to your computer and use it in GitHub Desktop.
demo simple http server for llms and proxies
import * as http from 'http';
import * as url from 'url';
import * as https from 'https';
import { Readable } from 'stream';
// Define types
interface Request extends http.IncomingMessage {
params?: Record<string, string>;
startTime?: number;
}
interface Response extends http.ServerResponse {
end: (...args: any[]) => Promise<void>;
}
type Middleware = (req: Request, res: Response) => Promise<void> | void;
type RouteHandler = (req: Request, res: Response) => Promise<void> | void;
interface Route {
pattern: string;
handler: RouteHandler;
}
interface ProxyOptions {
url: string;
method?: string;
headers?: Record<string, string>;
body?: string;
}
interface StreamCallbacks {
onStart?: (req: Request, res: Response) => void; // Before proxy request starts
onResponse?: (statusCode: number, headers: http.IncomingHttpHeaders) => void; // On proxy response
onChunk?: (chunk: string) => void; // On raw chunk received
onMessage?: (message: string) => void; // On new message (newline-delimited)
onDone?: () => void; // On [DONE] or stream end
onError?: (err: Error) => void; // On stream or request error
}
// Router class with enhanced streaming
class Router {
private routes: Record<string, Route[]> = { 'GET': [] };
private middlewares: Middleware[] = [];
public use(middleware: Middleware): void {
this.middlewares.push(middleware);
}
public route(method: string, pattern: string, handler: RouteHandler): void {
if (!this.routes[method]) {
this.routes[method] = [];
}
this.routes[method].push({ pattern, handler });
}
private matchRoute(method: string, path: string): { handler: RouteHandler; params: Record<string, string> } | null {
const methodRoutes = this.routes[method] || [];
for (const route of methodRoutes) {
const { pattern } = route;
const patternParts = pattern.split('/').filter(Boolean);
const pathParts = path.split('/').filter(Boolean);
if (patternParts.length !== pathParts.length) continue;
const params: Record<string, string> = {};
let isMatch = true;
for (let i = 0; i < patternParts.length; i++) {
if (patternParts[i].startsWith(':')) {
const paramName = patternParts[i].slice(1);
params[paramName] = pathParts[i];
} else if (patternParts[i] === '*') {
continue;
} else if (patternParts[i] !== pathParts[i]) {
isMatch = false;
break;
}
}
if (isMatch) {
return { handler: route.handler, params };
}
}
return null;
}
private *middlewareGenerator(req: Request, res: Response, handlers: Array<Middleware | RouteHandler>): Generator<Promise<void> | void> {
for (const handler of handlers) {
yield handler(req, res);
}
}
private async executeMiddlewares(req: Request, res: Response, handlers: Array<Middleware | RouteHandler>): Promise<void> {
const originalEnd = res.end;
let ended = false;
res.end = (...args: any[]): Promise<void> => {
if (!ended) {
ended = true;
originalEnd.apply(res, args);
}
return Promise.resolve();
};
const iterator = this.middlewareGenerator(req, res, handlers);
async function step(result: IteratorResult<Promise<void> | void> = iterator.next()): Promise<void> {
if (result.done) return;
try {
const { value } = result;
await value;
await step(iterator.next());
} catch (err) {
res.writeHead(500, { 'Content-Type': 'text/plain' });
await res.end(`Internal Server Error: ${(err as Error).message}`);
}
}
await step();
}
// Enhanced proxyStream with callbacks
public async proxyStream(req: Request, res: Response, options: ProxyOptions, callbacks: StreamCallbacks = {}): Promise<void> {
return new Promise((resolve, reject) => {
callbacks.onStart?.(req, res);
const proxyReq = https.request(options.url, {
method: options.method || 'GET',
headers: {
...options.headers,
'Accept': 'text/event-stream',
},
}, (proxyRes) => {
callbacks.onResponse?.(proxyRes.statusCode || 200, proxyRes.headers);
res.writeHead(proxyRes.statusCode || 200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
});
const transformStream = new Readable({ read() {} });
let buffer = ''; // Buffer for partial messages
proxyRes.on('data', (chunk: Buffer) => {
const data = chunk.toString();
callbacks.onChunk?.(data);
buffer += data;
const lines = buffer.split('\n');
// Process complete lines, leave incomplete in buffer
for (let i = 0; i < lines.length - 1; i++) {
const line = lines[i].trim();
if (line === '[DONE]') {
callbacks.onDone?.();
transformStream.push(null); // End stream
return;
}
if (line.startsWith('data: ')) {
const message = line.slice(6); // Strip "data: "
if (message) {
callbacks.onMessage?.(message);
transformStream.push(`data: ${message}\n\n`);
}
}
}
buffer = lines[lines.length - 1]; // Keep incomplete line
});
proxyRes.on('end', () => {
if (buffer.trim() === '[DONE]') {
callbacks.onDone?.();
}
transformStream.push(null);
resolve();
});
proxyRes.on('error', (err) => {
callbacks.onError?.(err);
transformStream.destroy(err);
reject(err);
});
transformStream.pipe(res);
});
if (options.body) {
proxyReq.write(options.body);
}
proxyReq.end();
proxyReq.on('error', (err) => {
callbacks.onError?.(err);
reject(err);
});
});
}
public createServer(): http.Server {
return http.createServer(async (req: Request, res: Response) => {
const parsedUrl = url.parse(req.url || '', true);
const path = parsedUrl.pathname || '/';
const method = req.method || 'GET';
const match = this.matchRoute(method, path);
if (match) {
req.params = match.params;
const handlers = [...this.middlewares, match.handler];
await this.executeMiddlewares(req, res, handlers);
} else {
await this.executeMiddlewares(req, res, [...this.middlewares, this.notFound]);
}
});
}
private async notFound(req: Request, res: Response): Promise<void> {
res.writeHead(404, { 'Content-Type': 'text/plain' });
await res.end('404 Not Found');
}
}
// Exports
export { Router, Request, Response, Middleware, RouteHandler, ProxyOptions, StreamCallbacks };
// Example usage
const messages = ["Hello World!", "Greetings!", "Hi There!", "Hola!", "Bonjour!"];
const router = new Router();
router.use(async (req: Request, res: Response) => {
console.log(`[${req.method}] ${req.url} - ${new Date().toISOString()}`);
await new Promise(resolve => setTimeout(resolve, 100));
});
router.use(async (req: Request, res: Response) => {
req.startTime = Date.now();
await new Promise(resolve => setTimeout(resolve, 50));
res.setHeader('X-Response-Time', `${Date.now() - req.startTime}ms`);
});
router.route('GET', '/', async (req: Request, res: Response) => {
res.writeHead(200, { 'Content-Type': 'text/html' });
await res.end(`
<!DOCTYPE html>
<html>
<head>
<title>HTMX Hello World</title>
<script src="https://unpkg.com/[email protected]"></script>
</head>
<body>
<div id="message">Hello World!</div>
<button hx-get="/get-new-message" hx-target="#message" hx-swap="innerHTML">Change Message</button>
<button hx-get="/stream-llm" hx-target="#message" hx-swap="innerHTML">Stream LLM</button>
</body>
</html>
`);
});
router.route('GET', '/get-new-message', async (req: Request, res: Response) => {
const randomMessage = messages[Math.floor(Math.random() * messages.length)];
res.writeHead(200, { 'Content-Type': 'text/plain' });
await new Promise(resolve => setTimeout(resolve, 50));
await res.end(randomMessage);
});
// Streaming LLM proxy with callbacks
router.route('GET', '/stream-llm', async (req: Request, res: Response) => {
await router.proxyStream(req, res, {
url: 'https://api.openai.com/v1/chat/completions', // Replace with mock or real endpoint
method: 'POST',
headers: {
'Authorization': 'Bearer YOUR_OPENAI_KEY',
'Content-Type': 'application/json',
},
body: JSON.stringify({
model: 'gpt-3.5-turbo',
messages: [{ role: 'user', content: 'Tell me a story' }],
stream: true,
}),
}, {
onStart: (req, res) => console.log(`Starting stream for ${req.url}`),
onResponse: (status, headers) => console.log(`Response: ${status}`, headers),
onChunk: (chunk) => console.log(`Raw chunk: ${chunk}`),
onMessage: (message) => console.log(`New message: ${message}`),
onDone: () => console.log('Stream completed'),
onError: (err) => console.error(`Stream error: ${err.message}`),
});
});
// Start server
const server = router.createServer();
const port = 3000;
server.listen(port, () => {
console.log(`Server running at http://localhost:${port}`);
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment