Skip to content

Instantly share code, notes, and snippets.

@jacobparis
Created April 1, 2025 03:09
Show Gist options
  • Save jacobparis/5c6f5c2c6a64cf2953b814e5ca71b772 to your computer and use it in GitHub Desktop.
Save jacobparis/5c6f5c2c6a64cf2953b814e5ca71b772 to your computer and use it in GitHub Desktop.
React Router MCP Server
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js"
import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js"
import { Socket } from "net"
import { Readable } from "stream"
import { IncomingMessage, type ServerResponse } from "http"
import {
subscribeToChannel,
unsubscribeFromChannel,
publishMessage,
} from "../redis.server.js"
import { eventStream } from "remix-utils/sse/server"
let servers: McpServer[] = []
class McpTransport extends SSEServerTransport {
send: (message: any) => Promise<void>
private response: ServerResponse
constructor(path: string, send: (message: any) => void) {
const response = {
setHeader: () => {},
getHeader: () => undefined,
writeHead: () => response,
write: () => true,
end: () => {},
once: () => response,
on: () => response,
off: () => response,
emit: () => true,
removeListener: () => response,
addListener: () => response,
statusCode: 200,
headersSent: false,
finished: false,
} as unknown as ServerResponse
super(path, response)
this.send = async (message) => {
send(message)
}
this.response = response
}
protected emitMessage(message: any) {
return this.send(message)
}
}
export function mcpStream(
serverFactory: () => McpServer,
options: { signal: AbortSignal },
) {
const server = serverFactory()
servers.push(server)
server.server.onclose = () => {
servers = servers.filter((s) => s !== server)
}
const logs: { type: "log" | "error"; messages: string[] }[] = []
const logInterval = setInterval(() => {
for (const log of logs) {
console[log.type].call(console, ...log.messages)
}
logs.length = 0
}, 100)
return eventStream(options.signal, function setup(send) {
const cleanupController = new AbortController()
const transport = new McpTransport("/mcp/message", (message) => {
send({
event: "message",
data: JSON.stringify(message),
})
})
const handleMessage = async (message: string) => {
logs.push({
type: "log",
messages: ["Received message from Redis", message],
})
const { requestId, url, method, headers, body } = JSON.parse(message)
let status = 100
let responseBody = ""
try {
await transport.handlePostMessage(
createRequest({ method, url, headers, body }),
createResponse((r) => {
status = r.status
responseBody = r.body
}),
)
await publishMessage(
`responses:${transport.sessionId}:${requestId}`,
JSON.stringify({
jsonrpc: "2.0",
id: requestId,
result: {
status,
body: responseBody,
},
}),
)
logs.push({
type: status >= 200 && status < 300 ? "log" : "error",
messages: [
`Request ${transport.sessionId}:${requestId} ${
status >= 200 && status < 300 ? "succeeded" : "failed"
}: ${responseBody}`,
],
})
} catch (error) {
await publishMessage(
`responses:${transport.sessionId}:${requestId}`,
JSON.stringify({
jsonrpc: "2.0",
id: requestId,
error: {
code: -32000,
message:
error instanceof Error ? error.message : "Internal error",
},
}),
)
logs.push({
type: "error",
messages: [
`Request ${transport.sessionId}:${requestId} failed with error: ${error}`,
],
})
}
}
const cleanup = async () => {
clearInterval(logInterval)
await unsubscribeFromChannel(
`requests:${transport.sessionId}`,
handleMessage,
)
await server.close()
servers = servers.filter((s) => s !== server)
cleanupController.abort()
}
// Set up the abort listener before any async operations
options.signal.addEventListener("abort", cleanup, { once: true })
cleanupController.signal.addEventListener("abort", cleanup, { once: true })
// Start the subscription and server connection
subscribeToChannel(`requests:${transport.sessionId}`, handleMessage).then(
() => {
return server.connect(transport)
},
)
return cleanup
})
}
function createRequest({
method = "GET",
url = "/",
headers = {},
body = null,
} = {}) {
const readable = new Readable()
readable._read = () => {}
if (body) {
readable.push(typeof body === "string" ? body : JSON.stringify(body))
readable.push(null)
}
const req = new IncomingMessage(new Socket())
req.method = method
req.url = url
req.headers = headers
req.push = readable.push.bind(readable)
req.read = readable.read.bind(readable)
req.on = readable.on.bind(readable)
req.pipe = readable.pipe.bind(readable)
return req
}
function createResponse(
callback: ({ status, body }: { status: number; body: string }) => void,
) {
return {
writeHead(code: number) {
callback({ status: code, body: "" })
return this
},
end(body: unknown) {
callback({ status: 200, body: body as string })
return this
},
} as unknown as ServerResponse
}
import { createClient } from "redis"
import { invariant } from "@epic-web/invariant"
import { remember } from "@epic-web/remember"
export const redis = remember("redis", createRedisClient)
export const redisPublisher = remember("redisPublisher", createRedisPublisher)
// Maximum duration for SSE connections in seconds
export const maxDuration = 60
export function createRedisClient() {
const redisUrl = process.env.REDIS_URL || process.env.KV_URL
invariant(redisUrl, "REDIS_URL or KV_URL environment variable is not set")
const client = createClient({ url: redisUrl })
client.on("error", (err) => {
console.error(`Redis error ${redisUrl}`, err)
})
// Connect immediately
void client.connect().catch((err) => {
console.error(`Failed to connect to ${redisUrl}`, err)
})
return client
}
export function createRedisPublisher() {
const redisUrl = process.env.REDIS_URL || process.env.KV_URL
invariant(redisUrl, "REDIS_URL or KV_URL environment variable is not set")
const client = createClient({ url: redisUrl })
client.on("error", (err) => {
console.error(`Redis publisher error ${redisUrl}`, err)
})
// Connect immediately
void client.connect().catch((err) => {
console.error(`Failed to connect to ${redisUrl}`, err)
})
return client
}
// Helper function to ensure Redis clients are connected
export async function ensureRedisConnected() {
const redisConnected = redis.isOpen ? Promise.resolve() : redis.connect()
const publisherConnected = redisPublisher.isOpen
? Promise.resolve()
: redisPublisher.connect()
try {
await Promise.race([
Promise.all([redisConnected, publisherConnected]),
new Promise((_, reject) =>
setTimeout(() => reject(new Error("Redis connection timeout")), 5000),
),
])
} catch (error) {
// Clean up connections if they were started
if (!redis.isOpen) {
await redis.disconnect()
}
if (!redisPublisher.isOpen) {
await redisPublisher.disconnect()
}
throw error
}
}
// Helper for publishing messages
export async function publishMessage(channel: string, message: unknown) {
await ensureRedisConnected()
return redisPublisher.publish(
channel,
typeof message === "string" ? message : JSON.stringify(message),
)
}
// Helper for subscribing to channels
export async function subscribeToChannel(
channel: string,
callback: (message: string) => void,
) {
await ensureRedisConnected()
return redis.subscribe(channel, callback)
}
// Helper for unsubscribing from channels
export async function unsubscribeFromChannel(
channel: string,
callback: (message: string) => void,
) {
if (redis.isOpen) {
return redis.unsubscribe(channel, callback)
}
}
import type { ActionFunctionArgs } from "react-router"
import type { IncomingHttpHeaders } from "http"
import {
ensureRedisConnected,
subscribeToChannel,
unsubscribeFromChannel,
publishMessage,
} from "../redis.server.js"
interface SerializedRequest {
requestId: string
url: string
method: string
body: string
headers: IncomingHttpHeaders
}
export async function action({ request }: ActionFunctionArgs) {
await ensureRedisConnected()
// Get the request body
const body = await request.text()
// Get the URL and extract sessionId
const url = new URL(request.url)
const sessionId = url.searchParams.get("sessionId") || ""
if (!sessionId) {
return new Response("No sessionId provided", { status: 400 })
}
const requestId = crypto.randomUUID()
const serializedRequest: SerializedRequest = {
requestId,
url: request.url,
method: request.method,
body: body,
headers: Object.fromEntries(
request.headers.entries(),
) as IncomingHttpHeaders,
}
// Create a controller to handle timeouts and aborts
const controller = new AbortController()
// Create a promise that resolves when we get a response
const responsePromise = new Promise<Response>((resolve, reject) => {
// Handles responses from the /sse endpoint
subscribeToChannel(`responses:${sessionId}:${requestId}`, (message) => {
controller.abort()
const response = JSON.parse(message) as {
status: number
body: string
}
resolve(new Response(response.body, { status: response.status }))
}).catch(reject)
// Set a timeout
setTimeout(() => {
controller.abort()
reject(new Response("Request timed out", { status: 408 }))
}, 10 * 1000)
})
// Queue the request in Redis so that a subscriber can pick it up
await publishMessage(`requests:${sessionId}`, serializedRequest)
// Handle cleanup when the request is aborted
const abortHandler = async () => {
controller.abort()
await unsubscribeFromChannel(
`responses:${sessionId}:${requestId}`,
() => {},
)
}
request.signal.addEventListener("abort", abortHandler)
try {
// Wait for the response or timeout
const response = await responsePromise
await unsubscribeFromChannel(
`responses:${sessionId}:${requestId}`,
() => {},
)
request.signal.removeEventListener("abort", abortHandler)
return response
} catch (error) {
await unsubscribeFromChannel(
`responses:${sessionId}:${requestId}`,
() => {},
)
request.signal.removeEventListener("abort", abortHandler)
if (error instanceof Response) {
return error
}
console.error("Error handling message:", error)
return new Response("Internal server error", { status: 500 })
}
}
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js"
import { z } from "zod"
import type { LoaderFunctionArgs } from "react-router"
import { ensureRedisConnected } from "../redis.server.js"
import { mcpStream } from "#app/utils/mcp.server.js"
import { matchSorter } from "match-sorter"
import { cachified, lru } from "#app/cache.server.ts"
import { remember } from "@epic-web/remember"
import { getAllPkgConfigs } from "#app/db/queries/get-all-pkg-configs.ts"
const mcpServer = remember("mcp-server", () => {
const server = new McpServer(
{
name: "pkgless",
version: "0.1.0",
},
{
capabilities: {
tools: {
"templates-search": {
description: "Search for project templates based on criteria",
},
},
},
},
)
server.tool(
"templates-search",
{
query: z.string().optional(),
},
async ({ query }) => {
console.log(`[MCP] templates-search`, { query })
const templates = await cachified({
key: "pkg-configs",
cache: lru,
ttl: 60 * 10 * 1000, // 10 minutes
swr: Infinity,
async getFreshValue() {
console.log(`[Cache] Getting fresh value for pkg-configs in MCP`)
return getAllPkgConfigs()
},
})
let filteredTemplates = templates
if (query?.trim()) {
filteredTemplates = matchSorter(filteredTemplates, query.trim(), {
keys: [
"name",
"repo",
"environments",
"config.description",
"config.technologies",
],
})
}
return {
content: [
{
type: "text",
text: `Found ${filteredTemplates.length} matching templates`,
},
],
templates: filteredTemplates.map((template) => ({
repo: template.repo,
path: template.path,
installCommand: `npx degit ${template.repo}${template.path ? `/${template.path}` : ""}`,
})),
}
},
)
return server
})
export async function loader({ request }: LoaderFunctionArgs) {
await ensureRedisConnected()
const timeoutSignal = AbortSignal.any([
request.signal,
AbortSignal.timeout(30000),
])
return mcpStream(() => mcpServer, { signal: timeoutSignal })
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment