Just implement MCP tool calling in a minimalistic agentic loop, and boom, you have full protocol support! (including elicitation, sampling, logging, events, all of it!)
-
-
Save ochafik/1b5977aad557aff9d6222cc1f124957d to your computer and use it in GitHub Desktop.
MCP Continuation Backfill
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| /* | |
| This example implements an stdio MCP proxy that backfills elicitations related to tool calls. | |
| It transforms the output schema to be the original output OR an elicitation request + continuation token, | |
| and transforms the input schema to accept the original inputs OR elicitation results + continuation token. | |
| Usage: | |
| npx -y @modelcontextprotocol/inspector \ | |
| npx -y --silent tsx continuation.ts -- \ | |
| npx -y --silent @modelcontextprotocol/server-everything | |
| */ | |
| import { StdioServerTransport } from '../../server/stdio.js'; | |
| import { StdioClientTransport } from '../../client/stdio.js'; | |
| import { | |
| CancelledNotification, | |
| CancelledNotificationSchema, | |
| isInitializeRequest, | |
| isJSONRPCRequest, | |
| ElicitRequest, | |
| ElicitRequestSchema, | |
| CreateMessageRequest, | |
| CreateMessageRequestSchema, | |
| CreateMessageResult, | |
| JSONRPCResponse, | |
| isInitializedNotification, | |
| CallToolRequest, | |
| CallToolRequestSchema, | |
| isJSONRPCNotification, | |
| LoggingMessageNotification, | |
| JSONRPCNotification, | |
| ElicitResult, | |
| ElicitResultSchema, | |
| Tool, | |
| RequestIdSchema, | |
| CallToolResultSchema, | |
| NotificationSchema, | |
| ServerRequestSchema, | |
| ServerNotificationSchema, | |
| ClientResultSchema, | |
| RequestId, | |
| isJSONRPCResponse, | |
| CallToolResult, | |
| ServerNotification, | |
| ServerRequest, | |
| JSONRPCRequest, | |
| ListToolsRequestSchema, | |
| ListToolsRequest, | |
| ListToolsResult, | |
| ListToolsResultSchema, | |
| JSONRPCMessageSchema, | |
| JSONRPCRequestSchema, | |
| PingRequestSchema, | |
| ProgressNotificationSchema, | |
| LoggingMessageNotificationSchema, | |
| ResourceUpdatedNotificationSchema, | |
| ResourceListChangedNotificationSchema, | |
| ToolListChangedNotificationSchema, | |
| PromptListChangedNotificationSchema, | |
| CreateMessageResultSchema, | |
| EmptyResultSchema, | |
| InitializedNotification, | |
| InitializeRequest, | |
| } from "../../types.js"; | |
| import { Transport } from "../../shared/transport.js"; | |
| import { Server } from "../../server/index.js"; | |
| import z from 'zod'; | |
| import { convertJsonSchemaToZod } from "zod-from-json-schema"; | |
| import { zodToJsonSchema } from "zod-to-json-schema"; | |
| import { request } from 'http'; | |
| import { id } from 'zod/v4/locales'; | |
| const DEFAULT_MAX_TOKENS = process.env.DEFAULT_MAX_TOKENS ? parseInt(process.env.DEFAULT_MAX_TOKENS) : 1000; | |
| // TODO: move to SDK | |
| const isCancelledNotification: (value: unknown) => value is CancelledNotification = | |
| ((value: any) => CancelledNotificationSchema.safeParse(value).success) as any; | |
| export const isCallToolRequest = (value: unknown): value is CallToolRequest => | |
| CallToolRequestSchema.safeParse(value).success; | |
| export const isCallToolResult = (value: unknown): value is CallToolResult => | |
| CallToolResultSchema.safeParse(value).success; | |
| export const isListToolsRequest = (value: unknown): value is ListToolsRequest => | |
| ListToolsRequestSchema.safeParse(value).success; | |
| export const isListToolsResult = (value: unknown): value is ListToolsResult => | |
| ListToolsResultSchema.safeParse(value).success; | |
| // const isCallToolRequest: (value: unknown) => value is CallToolRequest = | |
| // ((value: any) => CallToolRequestSchema.safeParse(value).success) as any; | |
| // const isElicitRequest: (value: unknown) => value is ElicitRequest = | |
| // ((value: any) => ElicitRequestSchema.safeParse(value).success) as any; | |
| // const isElicitResult: (value: unknown) => value is ElicitResult = | |
| // ((value: any) => ElicitResultSchema.safeParse(value).success) as any; | |
| const isCreateMessageRequest: (value: unknown) => value is CreateMessageRequest = | |
| ((value: any) => CreateMessageRequestSchema.safeParse(value).success) as any; | |
| export type NamedTransport<T extends Transport = Transport> = { | |
| name: 'client' | 'server', | |
| transport: T, | |
| } | |
| // const ContinuationIdSchema = RequestIdSchema; | |
| const buildContinuableOutputSchema = (outputSchema?: z.ZodTypeAny) => z.object({ | |
| outputOrContinuable: z.discriminatedUnion('type', [ | |
| z.object({ | |
| type: z.literal('output'), | |
| output: outputSchema ?? z.object({}).passthrough(), | |
| }), | |
| z.object({ | |
| type: z.literal('pausedWithCreateMessageRequest'), | |
| id: RequestIdSchema, | |
| request: CreateMessageRequestSchema, | |
| }).describe("The tool call was paused to request a new message from the client. The client should process the request, then MUST resume the tool call w/ a resumeWithCreateMessageResult argument."), | |
| z.object({ | |
| type: z.literal('pausedWithElicitRequest'), | |
| id: RequestIdSchema, | |
| request: ElicitRequestSchema, | |
| }).describe("The tool call was paused to request additional information from the client. The client should process the request, then MUST resume the tool call w/ a resumeWithElicitResult argument."), | |
| z.object({ | |
| type: z.literal('pausedWithResourceUpdatedNotification'), | |
| id: RequestIdSchema, | |
| notification: ResourceUpdatedNotificationSchema, | |
| }).describe("The tool call was paused to notify a resource update. The client should process the notification, then MUST resume the tool call w/ a resumeAfterNotification argument."), | |
| z.object({ | |
| type: z.literal('pausedWithResourceListChangedNotification'), | |
| id: RequestIdSchema, | |
| notification: ResourceListChangedNotificationSchema, | |
| }).describe("The tool call was paused to notify a resource list change. The client should process the notification, then MUST resume the tool call w/ a resumeAfterNotification argument."), | |
| z.object({ | |
| type: z.literal('pausedWithToolListChangedNotification'), | |
| id: RequestIdSchema, | |
| notification: ToolListChangedNotificationSchema, | |
| }).describe("The tool call was paused to notify a tool list change. The client should process the notification, then MUST resume the tool call w/ a resumeAfterNotification argument."), | |
| z.object({ | |
| type: z.literal('pausedWithPromptListChangedNotification'), | |
| id: RequestIdSchema, | |
| notification: PromptListChangedNotificationSchema, | |
| }).describe("The tool call was paused to notify a prompt list change. The client should process the notification, then MUST resume the tool call w/ a resumeAfterNotification argument."), | |
| z.object({ | |
| type: z.literal('pausedWithProgressNotification'), | |
| id: RequestIdSchema, | |
| notification: ProgressNotificationSchema, | |
| }).describe("The tool call was paused to notify progress. The client should process the notification, then MUST resume the tool call w/ a resumeAfterNotification argument."), | |
| z.object({ | |
| type: z.literal('pausedWithLoggingMessageNotification'), | |
| id: RequestIdSchema, | |
| notification: LoggingMessageNotificationSchema, | |
| }).describe("The tool call was paused to notify a log message. The client should process the notification, then MUST resume the tool call w/ a resumeAfterClientNotified argument."), | |
| ]), | |
| }); | |
| const GenericContinuableOutputSchema = buildContinuableOutputSchema(); | |
| const buildContinuableInputSchema = (inputSchema: z.ZodTypeAny) => z.object({ | |
| inputOrContinuation: z.discriminatedUnion('type', [ | |
| z.object({ | |
| type: z.literal('input'), | |
| input: inputSchema, | |
| }), | |
| // z.object({ | |
| // type: z.literal('resumeWithClientResult'), | |
| // result: ClientResultSchema.optional(), | |
| // serverRequestId: RequestIdSchema, | |
| // callToolId: RequestIdSchema, | |
| // }).describe("Resume a previous tool call (with id = callToolId) that was paused by a server request (with id = serverRequestId)."), | |
| z.object({ | |
| type: z.literal('resumeWithCreateMessageResult'), | |
| result: CreateMessageResultSchema.optional(), | |
| serverRequestId: RequestIdSchema, | |
| callToolRequestId: RequestIdSchema, | |
| }).describe("Resume a previous tool call request (with id = callToolRequestId) that returned a pausedWithCreateMessageRequest (with id = serverRequestId)."), | |
| z.object({ | |
| type: z.literal('resumeWithElicitResult'), | |
| result: ElicitResultSchema, | |
| serverRequestId: RequestIdSchema, | |
| callToolRequestId: RequestIdSchema, | |
| }).describe("Resume a previous tool call request (with id = callToolRequestId) that returned a pausedWithElicitRequest (with id = serverRequestId)."), | |
| z.object({ | |
| type: z.literal('resumeAfterNotification'), | |
| serverNotificationId: RequestIdSchema, | |
| callToolRequestId: RequestIdSchema, | |
| }).describe("Resume a previous tool call request (with id = callToolRequestId) that returned a pausedWith*Notification (with id = serverNotificationId)."), | |
| ]), | |
| }); | |
| const GenericContinuableInputSchema = buildContinuableInputSchema(z.object({}).passthrough()); | |
| const WaitForServerMessagesInputSchema = z.object({ | |
| waitTimeoutSeconds: z.number().min(0).optional().default(30).describe("Maximum time to wait for server messages, in seconds."), | |
| }).passthrough(); | |
| const WaitForServerMessagesOutputSchema = buildContinuableOutputSchema(z.literal('Timed Out')); | |
| const WaitForServerMessagesTool: Tool = { | |
| name: "waitForServerMessages", | |
| description: "Wait for messages (notifications or requests) from the MCP server and return them. Clients should keep calling this tool (and continually retry after it times out) to get spontaneous server requests / notifications.", | |
| inputSchema: zodToJsonSchema(WaitForServerMessagesInputSchema) as any, | |
| outputSchema: zodToJsonSchema(WaitForServerMessagesOutputSchema) as any, | |
| }; | |
| function transformTool(tool: Tool): Tool { | |
| return { | |
| ...tool, | |
| inputSchema: zodToJsonSchema(buildContinuableInputSchema(convertJsonSchemaToZod(tool.inputSchema)), { strictUnions: true }) as any, | |
| outputSchema: zodToJsonSchema(buildContinuableOutputSchema(tool.outputSchema && convertJsonSchemaToZod(tool.outputSchema)), { strictUnions: true }) as any, | |
| // z.object({ | |
| // inputsOrContinuation: z.union([ | |
| // // ...(Array.isArray(tool.inputSchema?.anyOf) ? tool.inputSchema.anyOf : [tool.inputSchema]), | |
| // convertJsonSchemaToZod(tool.inputSchema as any), | |
| // // ElicitResultSchema, | |
| // z.object({ | |
| // continuationFromRequestId: RequestIdSchema, | |
| // elicitationResult: ElicitResultSchema, | |
| // }), | |
| // ]), | |
| // }), | |
| // outputSchema: z.object({ | |
| // outputOrElicitation: z.union([ | |
| // // ...(Array.isArray(tool.outputSchema?.anyOf) ? tool.outputSchema.anyOf : [tool.outputSchema]), | |
| // // anyOf: [ | |
| // // ...(Array.isArray(tool.inputSchema?.anyOf) ? tool.inputSchema.anyOf : [tool.inputSchema]), | |
| // // ElicitRequestSchema, | |
| // // ] | |
| // // } | |
| }; | |
| } | |
| export async function setupBackfill(client: NamedTransport, server: NamedTransport) { | |
| let initialize: InitializeRequest['params'] | undefined; | |
| // let clientSupportsSampling: boolean | undefined; | |
| const pending: Record<RequestId, { | |
| pendingServerRequests: Record<RequestId, (JSONRPCRequest & ServerRequest)>, | |
| pendingServerNotifications: Record<RequestId, (JSONRPCNotification & ServerNotification)>, | |
| }> = {}; | |
| // const pendingCallToolRequestIds = new Set<RequestId>(); | |
| const propagateMessage = (source: NamedTransport, target: NamedTransport) => { | |
| source.transport.onmessage = async (message, _extra) => { | |
| if (isJSONRPCRequest(message)) { | |
| const sendInternalError = (errorMessage: string) => { | |
| console.error(`[proxy -> ${source.name}]: Error: ${errorMessage}`); | |
| source.transport.send({ | |
| jsonrpc: "2.0", | |
| id: message.id, | |
| error: { | |
| code: -32603, // Internal error | |
| message: errorMessage, | |
| }, | |
| }, {relatedRequestId: message.id}); | |
| }; | |
| if (isInitializeRequest(message)) { | |
| initialize = message.params; | |
| } else if (isCallToolRequest(message)) { | |
| // if (source.name !== 'client') { | |
| // sendInternalError("CallToolRequest can only be sent from client to server"); | |
| // return; | |
| // } | |
| pendingContinuableCallTools[message.id] = { | |
| pendingServerRequests: {}, | |
| }; | |
| const continuableArgs = GenericContinuableInputSchema.parse(message.params.arguments); | |
| switch (continuableArgs.inputOrContinuation.type) { | |
| case 'input': { | |
| break; | |
| } | |
| case 'continuation': { | |
| const pendingRequests = pendingContinuableCallTools[continuableArgs.inputOrContinuation.continuationId]; | |
| if (!pendingRequests) { | |
| sendInternalError(`Unknown continuationId in CallToolRequest with inputOrContinuation.type='continuation': ${continuableArgs.inputOrContinuation.continuationId}`); | |
| return; | |
| } | |
| const pending = pendingRequests[continuableArgs.inputOrContinuation.continuationId]; | |
| const result = continuableArgs.inputOrContinuation.result; | |
| if (!result && ) | |
| target.transport.send({ | |
| jsonrpc: "2.0", | |
| id: continuableArgs.inputOrContinuation.continuationId, | |
| result: result ?? { }, | |
| }, {relatedRequestId: message.id}); | |
| i[continuableArgs.inputOrContinuation.continuationId]; | |
| if (!pending) { | |
| sendInternalError(`Unknown continuationId in CallToolRequest with inputOrContinuation.type='continuation': ${continuableArgs.inputOrContinuation.continuationId}`); | |
| return; | |
| } | |
| if (continuableArgs.inputOrContinuation.type === 'continuation') { | |
| if (!continuableArgs.inputOrContinuation.continuationId) { | |
| sendInternalError("Missing continuationId in CallToolRequest with inputOrContinuation.type='continuation'"); | |
| return; | |
| } | |
| if (continuableArgs.inputOrContinuation.result) { | |
| if ('inputOrContinuation' in message.params) { | |
| if ('type' in message.params.inputOrContinuation && message.params.inputOrContinuation.type === 'continuation') { | |
| if (message.params.inputOrContinuation.type === 'continuation') { | |
| if (!message.params.inputOrContinuation.continuationId) { | |
| message.params.arguments = {} | |
| } | |
| } else if (isJSONRPCResponse(message)) { | |
| if (isListToolsResult(message)) { | |
| message.tools = [ | |
| WaitForServerMessagesTool, | |
| ...message.tools.map(transformTool), | |
| ]; | |
| // await target.transport.send({ | |
| // id: message.id, | |
| // jsonrpc: "2.0", | |
| // result: <ListToolsResult>{ | |
| // ...message, | |
| // tools: message.tools.map(transformTool), | |
| // }, | |
| // }); | |
| // return; | |
| } else if (isCallToolResult(message)) { | |
| // if (message.params.name === WaitForServerMessagesTool.name) { | |
| target.transport.send(<JSONRPCResponse>{ | |
| id: message.id, | |
| jsonrpc: "2.0", | |
| result: GenericContinuableOutputSchema.parse({ | |
| outputOrContinuable: { | |
| type: 'output', | |
| output: message.params, | |
| }, | |
| }), | |
| }); | |
| return; | |
| // return; | |
| const continuableResult = GenericContinuableOutputSchema.parse(message.contents); | |
| // Cancel pending server requests if any | |
| await Promise.all(Object.entries(pendingContinuableCallTools[message.id]?.pendingServerRequests ?? {}).map(([requestId, request]) => { | |
| return server.transport.send(<JSONRPCNotification & CancelledNotification>{ | |
| jsonrpc: "2.0", | |
| method: "notifications/cancelled", | |
| params: { | |
| requestId, | |
| } | |
| }, {relatedRequestId: requestId}); | |
| })); | |
| delete pendingContinuableCallTools[message.id]; | |
| } | |
| } else if (isJSONRPCNotification(message)) { | |
| if (isInitializedNotification(message) && source.name === 'server') { | |
| if (!clientSupportsSampling) { | |
| message.params = {...(message.params ?? {}), _meta: {...(message.params?._meta ?? {}), ...backfillMeta}}; | |
| } | |
| } | |
| } | |
| try { | |
| const relatedRequestId = isCancelledNotification(message)? message.params.requestId : undefined; | |
| await target.transport.send(message, {relatedRequestId}); | |
| } catch (error) { | |
| source.transport.send(<JSONRPCNotification & LoggingMessageNotification>{ | |
| jsonrpc: "2.0", | |
| method: "notifications/message", | |
| params: { | |
| type: "log_message", | |
| level: "error", | |
| message: `Error sending message to ${target.name}: ${(error as Error).message}`, | |
| } | |
| }); | |
| } | |
| }; | |
| }; | |
| propagateMessage(server, client); | |
| propagateMessage(client, server); | |
| const addErrorHandler = (transport: NamedTransport) => { | |
| transport.transport.onerror = async (error: Error) => { | |
| console.error(`[proxy]: Error from ${transport.name} transport:`, error); | |
| }; | |
| }; | |
| addErrorHandler(client); | |
| addErrorHandler(server); | |
| await server.transport.start(); | |
| await client.transport.start(); | |
| } | |
| async function main() { | |
| const args = process.argv.slice(2); | |
| const client: NamedTransport = {name: 'client', transport: new StdioClientTransport({command: args[0], args: args.slice(1)})}; | |
| const server: NamedTransport = {name: 'server', transport: new StdioServerTransport()}; | |
| const api = new Anthropic(); | |
| await setupBackfill(client, server, api); | |
| console.error("[proxy]: Transports started."); | |
| } | |
| main().catch((error) => { | |
| console.error("[proxy]: Fatal error:", error); | |
| process.exit(1); | |
| }); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| { | |
| "name": "micropilot", | |
| "version": "1.0.0", | |
| "description": "", | |
| "license": "ISC", | |
| "author": "", | |
| "type": "module", | |
| "bin": "dist/continuation.js", | |
| "scripts": { | |
| "prepare": "npm run build", | |
| "start": "bun run continuation.ts", | |
| "build": "bun build continuation.ts --outdir=dist --banner '#!/usr/bin/env node' --target=node --minify && shx chmod +x dist/continuation.js" | |
| }, | |
| "dependencies": { | |
| "@modelcontextprotocol/sdk": "^1.19.1", | |
| "zod-from-json-schema": "^0.5.0", | |
| "zod-to-json-schema": "^3.24.6" | |
| }, | |
| "devDependencies": { | |
| "bun": "^1.2.23", | |
| "shx": "^0.4.0" | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment