Last active
June 22, 2026 15:20
-
-
Save major/fb6fc6691fb53e1e4153ce2c75152711 to your computer and use it in GitHub Desktop.
Pi coding agent extension: Anthropic Claude models via Google Vertex AI (streamRawPredict)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| /** | |
| * Pi extension: Google Vertex AI → Anthropic Claude | |
| * | |
| * Routes Claude model requests through the Vertex AI streamRawPredict endpoint | |
| * instead of the direct Anthropic API. This lets you use Claude models under | |
| * GCP billing and IAM — no Anthropic API key required. | |
| * | |
| * Auth: Application Default Credentials via `gcloud auth application-default`. | |
| * The token is cached for 45 minutes (GCP ADC tokens expire after 60). | |
| * | |
| * Env vars (at least one from each pair required): | |
| * GOOGLE_CLOUD_PROJECT / GCLOUD_PROJECT | |
| * VERTEX_ANTHROPIC_LOCATION / GOOGLE_CLOUD_LOCATION | |
| * | |
| * The extension registers a custom streaming implementation (streamSimple) | |
| * because Vertex wraps the Anthropic Messages API behind its own endpoint and | |
| * auth scheme — pi's built-in anthropic-messages provider can't talk to it | |
| * directly. | |
| * | |
| * Costs are zero because Vertex billing is handled at the GCP project level, | |
| * not per-token via the Anthropic pricing API. | |
| */ | |
| import type { ExtensionAPI } from "@earendil-works/pi-coding-agent"; | |
| import { | |
| calculateCost, | |
| createAssistantMessageEventStream, | |
| type AssistantMessage, | |
| type AssistantMessageEventStream, | |
| type Context, | |
| type Model, | |
| type SimpleStreamOptions, | |
| type ToolCall, | |
| } from "@earendil-works/pi-ai"; | |
| import { execFile } from "node:child_process"; | |
| /** Shared defaults — models only override what differs (older models have smaller windows). */ | |
| const MODEL_DEFAULTS = { | |
| reasoning: true, | |
| contextWindow: 1_000_000, | |
| maxTokens: 128_000, | |
| } as const; | |
| /** Maps pi thinking levels to Anthropic budget_tokens values. */ | |
| const THINKING_BUDGETS: Record<string, number> = { | |
| minimal: 1024, | |
| low: 4096, | |
| medium: 10240, | |
| high: 20480, | |
| }; | |
| const MODELS = [ | |
| { id: "claude-haiku-4-5@20251001", name: "Claude Haiku 4.5 (Vertex)", contextWindow: 200_000, maxTokens: 64_000 }, | |
| { id: "claude-sonnet-4-5@20250929", name: "Claude Sonnet 4.5 (Vertex)", contextWindow: 200_000, maxTokens: 64_000 }, | |
| { id: "claude-sonnet-4-6@default", name: "Claude Sonnet 4.6 (Vertex)" }, | |
| { id: "claude-opus-4-5@20251101", name: "Claude Opus 4.5 (Vertex)", contextWindow: 200_000, maxTokens: 64_000 }, | |
| { id: "claude-opus-4-6@default", name: "Claude Opus 4.6 (Vertex)" }, | |
| { id: "claude-opus-4-7@default", name: "Claude Opus 4.7 (Vertex)" }, | |
| { id: "claude-opus-4-8@default", name: "Claude Opus 4.8 (Vertex)" }, | |
| ]; | |
| /** | |
| * ADC access token cache. GCP tokens live ~60 min; we refresh at 45 min to | |
| * avoid mid-request expiry. The token is fetched asynchronously via `gcloud` | |
| * to avoid blocking the event loop. | |
| */ | |
| const TOKEN_TTL_MS = 45 * 60 * 1000; | |
| let cachedToken: { value: string; expiresAt: number } | undefined; | |
| async function getAccessToken(): Promise<string> { | |
| if (cachedToken && Date.now() < cachedToken.expiresAt) { | |
| return cachedToken.value; | |
| } | |
| const value = await new Promise<string>((resolve, reject) => { | |
| execFile( | |
| "gcloud", | |
| ["auth", "application-default", "print-access-token"], | |
| { encoding: "utf8" }, | |
| (err, stdout) => (err ? reject(err) : resolve(stdout.trim())), | |
| ); | |
| }); | |
| cachedToken = { value, expiresAt: Date.now() + TOKEN_TTL_MS }; | |
| return value; | |
| } | |
| export default function (pi: ExtensionAPI) { | |
| pi.registerProvider("google-vertex-anthropic", { | |
| name: "Google Vertex Anthropic", | |
| api: "vertex-anthropic" as any, | |
| baseUrl: "https://aiplatform.googleapis.com", | |
| apiKey: "vertex-adc", | |
| streamSimple: streamVertexAnthropic, | |
| models: MODELS.map((m) => ({ | |
| ...MODEL_DEFAULTS, | |
| ...m, | |
| input: ["text", "image"] as const, | |
| cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 }, | |
| })), | |
| }); | |
| } | |
| /** | |
| * Custom streaming implementation for the Vertex Anthropic endpoint. | |
| * | |
| * Uses the streamRawPredict endpoint which accepts a raw Anthropic Messages API | |
| * body and returns SSE events in the same format as the direct Anthropic API. | |
| * Vertex may also return a single JSON response (non-streaming) for some error | |
| * cases, so we detect content-type and handle both paths. | |
| * | |
| * The async work runs in a detached IIFE so the stream object can be returned | |
| * synchronously — this is the pattern all pi providers follow. | |
| */ | |
| function streamVertexAnthropic( | |
| model: Model<any>, | |
| context: Context, | |
| options: SimpleStreamOptions = {}, | |
| ) { | |
| const stream = createAssistantMessageEventStream(); | |
| const output: AssistantMessage = { | |
| role: "assistant", | |
| content: [], | |
| api: model.api, | |
| provider: model.provider, | |
| model: model.id, | |
| usage: { | |
| input: 0, | |
| output: 0, | |
| cacheRead: 0, | |
| cacheWrite: 0, | |
| totalTokens: 0, | |
| cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 }, | |
| }, | |
| stopReason: "stop", | |
| timestamp: Date.now(), | |
| }; | |
| (async () => { | |
| try { | |
| const project = process.env.GOOGLE_CLOUD_PROJECT || process.env.GCLOUD_PROJECT; | |
| const location = process.env.VERTEX_ANTHROPIC_LOCATION || process.env.GOOGLE_CLOUD_LOCATION; | |
| if (!project || !location) { | |
| throw new Error( | |
| "Set GOOGLE_CLOUD_PROJECT (or GCLOUD_PROJECT) and VERTEX_ANTHROPIC_LOCATION (or GOOGLE_CLOUD_LOCATION)", | |
| ); | |
| } | |
| let body: any = { | |
| anthropic_version: "vertex-2023-10-16", | |
| max_tokens: options.maxTokens ?? model.maxTokens, | |
| messages: toAnthropicMessages(context), | |
| ...(context.systemPrompt ? { system: context.systemPrompt } : {}), | |
| ...(context.tools?.length | |
| ? { | |
| tools: context.tools.map((t) => ({ | |
| name: t.name, | |
| description: t.description, | |
| input_schema: t.parameters, | |
| })), | |
| } | |
| : {}), | |
| }; | |
| if (options.reasoning && model.reasoning) { | |
| const customBudget = | |
| options.thinkingBudgets?.[options.reasoning as keyof typeof options.thinkingBudgets]; | |
| body.thinking = { | |
| type: "enabled", | |
| budget_tokens: customBudget ?? THINKING_BUDGETS[options.reasoning] ?? 10240, | |
| }; | |
| } | |
| if (options.temperature !== undefined) body.temperature = options.temperature; | |
| if (options.onPayload) body = (await options.onPayload(body, model)) ?? body; | |
| const host = | |
| location === "global" | |
| ? "aiplatform.googleapis.com" | |
| : `${location}-aiplatform.googleapis.com`; | |
| const url = `https://${host}/v1/projects/${project}/locations/${location}/publishers/anthropic/models/${model.id}:streamRawPredict`; | |
| const accessToken = await getAccessToken(); | |
| const response = await fetch(url, { | |
| method: "POST", | |
| headers: { | |
| authorization: `Bearer ${accessToken}`, | |
| "content-type": "application/json", | |
| ...options.headers, | |
| }, | |
| body: JSON.stringify(body), | |
| signal: options.signal, | |
| }); | |
| await options.onResponse?.( | |
| { status: response.status, headers: Object.fromEntries(response.headers.entries()) }, | |
| model, | |
| ); | |
| if (!response.ok) { | |
| throw new Error(`${response.status} ${response.statusText}: ${await response.text()}`); | |
| } | |
| if (!response.body) { | |
| throw new Error("Vertex returned no response body"); | |
| } | |
| stream.push({ type: "start", partial: output }); | |
| if (response.headers.get("content-type")?.includes("application/json")) { | |
| handleJsonResponse(stream, output, await response.json()); | |
| } else { | |
| await parseSse(response.body, (event) => handleEvent(stream, output, event)); | |
| } | |
| calculateCost(model, output.usage); | |
| stream.push({ | |
| type: "done", | |
| reason: output.stopReason as "stop" | "length" | "toolUse", | |
| message: output, | |
| }); | |
| stream.end(); | |
| } catch (error) { | |
| output.stopReason = options.signal?.aborted ? "aborted" : "error"; | |
| output.errorMessage = error instanceof Error ? error.message : String(error); | |
| stream.push({ type: "error", reason: output.stopReason, error: output }); | |
| stream.end(); | |
| } | |
| })(); | |
| return stream; | |
| } | |
| /** | |
| * Converts pi's internal message format to Anthropic Messages API format. | |
| * | |
| * Key mappings: | |
| * - user messages pass through (string or content array) | |
| * - assistant messages include thinking blocks (with signatures for replay) | |
| * - toolResult messages become user messages with tool_result content blocks | |
| * (Anthropic requires tool results inside user turns) | |
| */ | |
| function toAnthropicMessages(context: Context) { | |
| const messages: any[] = []; | |
| for (const m of context.messages) { | |
| if (m.role === "user") { | |
| messages.push({ | |
| role: "user", | |
| content: typeof m.content === "string" ? m.content : m.content.map(contentPart), | |
| }); | |
| } | |
| if (m.role === "assistant") { | |
| messages.push({ | |
| role: "assistant", | |
| content: m.content.map(contentPart), | |
| }); | |
| } | |
| if (m.role === "toolResult") { | |
| messages.push({ | |
| role: "user", | |
| content: [ | |
| { | |
| type: "tool_result", | |
| tool_use_id: m.toolCallId, | |
| content: m.content.map((c) => (c.type === "text" ? c.text : "[image]")).join("\n"), | |
| is_error: m.isError, | |
| }, | |
| ], | |
| }); | |
| } | |
| } | |
| return messages; | |
| } | |
| /** | |
| * Maps a single pi content block to its Anthropic equivalent. | |
| * | |
| * Thinking blocks with a signature are sent as-is for faithful replay. | |
| * Thinking blocks without a signature (e.g. from older sessions) get an | |
| * empty string — Vertex accepts this without error. | |
| */ | |
| function contentPart(c: any) { | |
| if (c.type === "text") return { type: "text", text: c.text }; | |
| if (c.type === "image") { | |
| return { | |
| type: "image", | |
| source: { type: "base64", media_type: c.mimeType, data: c.data }, | |
| }; | |
| } | |
| if (c.type === "toolCall") return { type: "tool_use", id: c.id, name: c.name, input: c.arguments }; | |
| if (c.type === "thinking") { | |
| return { type: "thinking", thinking: c.thinking, signature: c.thinkingSignature ?? "" }; | |
| } | |
| return { type: "text", text: "" }; | |
| } | |
| /** | |
| * Minimal SSE parser for Vertex's streaming response. Buffers chunks until a | |
| * double-newline boundary, extracts `data:` lines, and parses the JSON payload. | |
| * Ignores `[DONE]` sentinel. | |
| */ | |
| async function parseSse( | |
| body: ReadableStream<Uint8Array>, | |
| onEvent: (event: any) => void, | |
| ) { | |
| const reader = body.getReader(); | |
| const decoder = new TextDecoder(); | |
| let buffer = ""; | |
| for (;;) { | |
| const { done, value } = await reader.read(); | |
| if (done) break; | |
| buffer += decoder.decode(value, { stream: true }); | |
| let idx; | |
| while ((idx = buffer.indexOf("\n\n")) >= 0) { | |
| const raw = buffer.slice(0, idx); | |
| buffer = buffer.slice(idx + 2); | |
| const data = raw | |
| .split("\n") | |
| .filter((l) => l.startsWith("data:")) | |
| .map((l) => l.slice(5).trim()) | |
| .join("\n"); | |
| if (data && data !== "[DONE]") onEvent(JSON.parse(data)); | |
| } | |
| } | |
| } | |
| /** | |
| * Merges Anthropic usage fields into pi's usage structure. Called from both | |
| * the JSON response handler and the SSE event handler so cache token counts | |
| * (cache_read_input_tokens, cache_creation_input_tokens) are captured | |
| * regardless of response format. | |
| */ | |
| function extractUsage(output: AssistantMessage, usage: any) { | |
| if (!usage) return; | |
| output.usage.input = usage.input_tokens ?? output.usage.input; | |
| output.usage.output = usage.output_tokens ?? output.usage.output; | |
| output.usage.cacheRead = usage.cache_read_input_tokens ?? output.usage.cacheRead; | |
| output.usage.cacheWrite = usage.cache_creation_input_tokens ?? output.usage.cacheWrite; | |
| output.usage.totalTokens = | |
| output.usage.input + output.usage.output + output.usage.cacheRead + output.usage.cacheWrite; | |
| } | |
| /** | |
| * Handles a non-streaming JSON response. Vertex occasionally returns a full | |
| * message as JSON (e.g. when the response is very short or on certain error | |
| * paths). We emit the same start/delta/end event sequence that the SSE path | |
| * produces so downstream consumers see a consistent stream. | |
| */ | |
| function handleJsonResponse( | |
| stream: AssistantMessageEventStream, | |
| output: AssistantMessage, | |
| message: any, | |
| ) { | |
| extractUsage(output, message.usage); | |
| for (const part of message.content ?? []) { | |
| if (part.type === "text") { | |
| const index = output.content.push({ type: "text", text: part.text ?? "" }) - 1; | |
| stream.push({ type: "text_start", contentIndex: index, partial: output }); | |
| if (part.text) { | |
| stream.push({ type: "text_delta", contentIndex: index, delta: part.text, partial: output }); | |
| } | |
| stream.push({ type: "text_end", contentIndex: index, content: part.text ?? "", partial: output }); | |
| } | |
| if (part.type === "thinking") { | |
| const index = | |
| output.content.push({ | |
| type: "thinking", | |
| thinking: part.thinking ?? "", | |
| thinkingSignature: part.signature ?? "", | |
| }) - 1; | |
| stream.push({ type: "thinking_start", contentIndex: index, partial: output }); | |
| if (part.thinking) { | |
| stream.push({ type: "thinking_delta", contentIndex: index, delta: part.thinking, partial: output }); | |
| } | |
| stream.push({ type: "thinking_end", contentIndex: index, content: part.thinking ?? "", partial: output }); | |
| } | |
| if (part.type === "tool_use") { | |
| const toolCall = { | |
| type: "toolCall" as const, | |
| id: part.id, | |
| name: part.name, | |
| arguments: part.input ?? {}, | |
| }; | |
| const index = output.content.push(toolCall) - 1; | |
| stream.push({ type: "toolcall_start", contentIndex: index, partial: output }); | |
| stream.push({ | |
| type: "toolcall_delta", | |
| contentIndex: index, | |
| delta: JSON.stringify(toolCall.arguments), | |
| partial: output, | |
| }); | |
| stream.push({ type: "toolcall_end", contentIndex: index, toolCall, partial: output }); | |
| } | |
| } | |
| output.stopReason = | |
| message.stop_reason === "max_tokens" | |
| ? "length" | |
| : output.content.some((b) => b.type === "toolCall") | |
| ? "toolUse" | |
| : "stop"; | |
| } | |
| /** | |
| * Processes a single SSE event from the Anthropic streaming response. | |
| * | |
| * Event lifecycle per content block: | |
| * content_block_start → content_block_delta (repeated) → content_block_stop | |
| * | |
| * Tool call arguments arrive as partial JSON fragments. We accumulate them in a | |
| * temporary `_json` scratch field on the block, attempt to parse after each | |
| * delta, and clean up the scratch field on block stop. | |
| * | |
| * Usage arrives in message_start (input tokens) and message_delta (output | |
| * tokens, cache tokens). Both are routed through extractUsage(). | |
| */ | |
| function handleEvent( | |
| stream: AssistantMessageEventStream, | |
| output: AssistantMessage, | |
| event: any, | |
| ) { | |
| // -- usage -- | |
| if (event.type === "message_start" && event.message?.usage) { | |
| extractUsage(output, event.message.usage); | |
| } | |
| if (event.type === "message_delta" && event.usage) { | |
| extractUsage(output, event.usage); | |
| } | |
| // -- content block start -- | |
| if (event.type === "content_block_start") { | |
| const b = event.content_block; | |
| if (b.type === "text") { | |
| output.content.push({ type: "text", text: b.text ?? "" }); | |
| stream.push({ type: "text_start", contentIndex: output.content.length - 1, partial: output }); | |
| } | |
| if (b.type === "thinking") { | |
| output.content.push({ | |
| type: "thinking", | |
| thinking: "", | |
| thinkingSignature: "", | |
| }); | |
| stream.push({ type: "thinking_start", contentIndex: output.content.length - 1, partial: output }); | |
| } | |
| if (b.type === "tool_use") { | |
| output.content.push({ type: "toolCall", id: b.id, name: b.name, arguments: b.input ?? {} }); | |
| stream.push({ type: "toolcall_start", contentIndex: output.content.length - 1, partial: output }); | |
| } | |
| } | |
| // -- content block delta -- | |
| if (event.type === "content_block_delta") { | |
| const block = output.content[event.index]; | |
| if (!block) return; | |
| if (block.type === "text" && event.delta?.text) { | |
| block.text += event.delta.text; | |
| stream.push({ type: "text_delta", contentIndex: event.index, delta: event.delta.text, partial: output }); | |
| } | |
| if (block.type === "thinking" && event.delta?.thinking) { | |
| block.thinking += event.delta.thinking; | |
| stream.push({ type: "thinking_delta", contentIndex: event.index, delta: event.delta.thinking, partial: output }); | |
| } | |
| if (block.type === "thinking" && event.delta?.type === "signature_delta") { | |
| block.thinkingSignature = (block.thinkingSignature || "") + (event.delta.signature ?? ""); | |
| } | |
| if (block.type === "toolCall" && event.delta?.partial_json) { | |
| const scratch = ((block as any)._json ?? "") + event.delta.partial_json; | |
| (block as any)._json = scratch; | |
| try { | |
| (block as ToolCall).arguments = JSON.parse(scratch || "{}"); | |
| } catch {} | |
| stream.push({ | |
| type: "toolcall_delta", | |
| contentIndex: event.index, | |
| delta: event.delta.partial_json, | |
| partial: output, | |
| }); | |
| } | |
| } | |
| // -- content block stop -- | |
| if (event.type === "content_block_stop") { | |
| const block = output.content[event.index]; | |
| if (!block) return; | |
| if (block.type === "text") { | |
| stream.push({ type: "text_end", contentIndex: event.index, content: block.text, partial: output }); | |
| } | |
| if (block.type === "thinking") { | |
| stream.push({ type: "thinking_end", contentIndex: event.index, content: block.thinking, partial: output }); | |
| } | |
| if (block.type === "toolCall") { | |
| // Final parse attempt + clean up scratch field | |
| try { | |
| (block as ToolCall).arguments = JSON.parse((block as any)._json || "{}"); | |
| } catch { | |
| // keep whatever we parsed so far | |
| } | |
| delete (block as any)._json; | |
| stream.push({ type: "toolcall_end", contentIndex: event.index, toolCall: block, partial: output }); | |
| } | |
| } | |
| // -- stop reason -- | |
| if (event.type === "message_delta" && event.delta?.stop_reason === "max_tokens") { | |
| output.stopReason = "length"; | |
| } | |
| if (event.type === "message_stop" && output.stopReason !== "length") { | |
| output.stopReason = output.content.some((b) => b.type === "toolCall") ? "toolUse" : "stop"; | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment