Skip to content

Instantly share code, notes, and snippets.

@imownbey
Created March 9, 2025 21:12
Show Gist options
  • Save imownbey/1941ac2a0056366ed73ed97db56333a9 to your computer and use it in GitHub Desktop.
Save imownbey/1941ac2a0056366ed73ed97db56333a9 to your computer and use it in GitHub Desktop.
Edge / Cloudflare Worker & Durable Object driven McpServer transport
import { Transport } from '@modelcontextprotocol/sdk/shared/transport.js';
import { JSONRPCMessage, JSONRPCMessageSchema } from '@modelcontextprotocol/sdk/types.js';
const MAXIMUM_MESSAGE_SIZE = 4 * 1024 * 1024; // 4MB
/**
* This transport is compatible with Cloudflare Workers and other edge environments
*/
export class EdgeSSETransport implements Transport {
private controller: ReadableStreamDefaultController<Uint8Array> | null = null;
readonly stream: ReadableStream<Uint8Array>;
private closed = false;
onclose?: () => void;
onerror?: (error: Error) => void;
onmessage?: (message: JSONRPCMessage) => void;
/**
* Creates a new EdgeSSETransport, which will direct the MPC client to POST messages to messageUrl
*/
constructor(
private messageUrl: string,
readonly sessionId: string,
) {
// Create a readable stream for SSE
this.stream = new ReadableStream({
start: (controller) => {
this.controller = controller;
},
cancel: () => {
this.closed = true;
this.onclose?.();
},
});
}
async start(): Promise<void> {
if (this.closed) {
throw new Error(
'SSE transport already closed! If using Server class, note that connect() calls start() automatically.',
);
}
// Make sure the controller exists
if (!this.controller) {
throw new Error('Stream controller not initialized');
}
// Send the endpoint event
const endpointMessage = `event: endpoint\ndata: ${encodeURI(this.messageUrl)}?sessionId=${this.sessionId}\n\n`;
this.controller.enqueue(new TextEncoder().encode(endpointMessage));
}
get sseResponse(): Response {
// Ensure the stream is properly initialized
if (!this.stream) {
throw new Error('Stream not initialized');
}
// Return a response with the SSE stream
return new Response(this.stream, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
},
});
}
/**
* Handles incoming Requests
*/
async handlePostMessage(req: Request): Promise<Response> {
if (this.closed || !this.controller) {
const message = 'SSE connection not established';
return new Response(message, { status: 500 });
}
try {
const contentType = req.headers.get('content-type') || '';
if (!contentType.includes('application/json')) {
throw new Error(`Unsupported content-type: ${contentType}`);
}
// Check if the request body is too large
const contentLength = parseInt(req.headers.get('content-length') || '0', 10);
if (contentLength > MAXIMUM_MESSAGE_SIZE) {
throw new Error(`Request body too large: ${contentLength} bytes`);
}
// Clone the request before reading the body to avoid stream issues
const body = await req.json();
await this.handleMessage(body);
return new Response('Accepted', { status: 202 });
} catch (error) {
this.onerror?.(error as Error);
return new Response(String(error), { status: 400 });
}
}
/**
* Handle a client message, regardless of how it arrived. This can be used to inform the server of messages that arrive via a means different than HTTP POST.
*/
async handleMessage(message: unknown): Promise<void> {
let parsedMessage: JSONRPCMessage;
try {
parsedMessage = JSONRPCMessageSchema.parse(message);
} catch (error) {
this.onerror?.(error as Error);
throw error;
}
this.onmessage?.(parsedMessage);
}
async close(): Promise<void> {
if (!this.closed && this.controller) {
this.controller.close();
this.stream.cancel();
this.closed = true;
this.onclose?.();
}
}
async send(message: JSONRPCMessage): Promise<void> {
if (this.closed || !this.controller) {
throw new Error('Not connected');
}
const messageText = `event: message\ndata: ${JSON.stringify(message)}\n\n`;
this.controller.enqueue(new TextEncoder().encode(messageText));
}
}
import { McpServer, ResourceTemplate } from '@modelcontextprotocol/sdk/server/mcp.js';
import { DurableObject } from 'cloudflare:workers';
import { EdgeSSETransport } from './EdgeSSETransport';
import { Hono } from 'hono';
// ---- Define McpServer
export function createMcpServer() {
const server = new McpServer({
name: 'test-mcp',
version: '1.0.0',
});
// Add a dynamic greeting resource
server.resource(
'greeting',
new ResourceTemplate('greeting://{name}', { list: undefined }),
async (uri, { name }) => ({
contents: [
{
uri: uri.href,
text: `Hello, ${name}!`,
},
],
}),
);
return server
}
//------ Define DurableObject
export class McpObject extends DurableObject {
private transport?: EdgeSSETransport;
private server: McpServer;
constructor(ctx: DurableObjectState, env: any) {
super(ctx, env);
this.server = createMcpServer();
}
override async fetch(request: Request) {
const url = new URL(request.url);
// Create the transport if it doesn't exist
if (!this.transport) {
const messageUrl = `${url.origin}${url.pathname.replace('sse', 'message')}`;
this.transport = new EdgeSSETransport(messageUrl, this.ctx.id.toString());
}
if (request.method === 'GET' && url.pathname.endsWith('/sse')) {
await this.server.connect(this.transport);
return this.transport.sseResponse;
}
if (request.method === 'POST' && url.pathname.endsWith('/message')) {
return this.transport.handlePostMessage(request);
}
return new Response('Not found', { status: 404 });
}
}
//---------- Define worker
const app = new Hono();
app.all('/mcp/*', async (c) => {
const sessionId = c.req.query('sessionId');
const object = c.env.MCP_OBJECT.get(
sessionId ? c.env.MCP_OBJECT.idFromString(sessionId) : c.env.MCP_OBJECT.newUniqueId(),
);
return object.fetch(c.req.raw);
});
export default {
fetch: app.fetch,
} satisfies ExportedHandler<Env>;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment