Skip to content

Instantly share code, notes, and snippets.

@major
Last active June 22, 2026 15:20
Show Gist options
  • Select an option

  • Save major/fb6fc6691fb53e1e4153ce2c75152711 to your computer and use it in GitHub Desktop.

Select an option

Save major/fb6fc6691fb53e1e4153ce2c75152711 to your computer and use it in GitHub Desktop.
Pi coding agent extension: Anthropic Claude models via Google Vertex AI (streamRawPredict)
/**
* Pi extension: Google Vertex AI → Anthropic Claude
*
* Routes Claude model requests through the Vertex AI streamRawPredict endpoint
* instead of the direct Anthropic API. This lets you use Claude models under
* GCP billing and IAM — no Anthropic API key required.
*
* Auth: Application Default Credentials via `gcloud auth application-default`.
* The token is cached for 45 minutes (GCP ADC tokens expire after 60).
*
* Env vars (at least one from each pair required):
* GOOGLE_CLOUD_PROJECT / GCLOUD_PROJECT
* VERTEX_ANTHROPIC_LOCATION / GOOGLE_CLOUD_LOCATION
*
* The extension registers a custom streaming implementation (streamSimple)
* because Vertex wraps the Anthropic Messages API behind its own endpoint and
* auth scheme — pi's built-in anthropic-messages provider can't talk to it
* directly.
*
* Costs are zero because Vertex billing is handled at the GCP project level,
* not per-token via the Anthropic pricing API.
*/
import type { ExtensionAPI } from "@earendil-works/pi-coding-agent";
import {
calculateCost,
createAssistantMessageEventStream,
type AssistantMessage,
type AssistantMessageEventStream,
type Context,
type Model,
type SimpleStreamOptions,
type ToolCall,
} from "@earendil-works/pi-ai";
import { execFile } from "node:child_process";
/** Shared defaults — models only override what differs (older models have smaller windows). */
const MODEL_DEFAULTS = {
reasoning: true,
contextWindow: 1_000_000,
maxTokens: 128_000,
} as const;
/** Maps pi thinking levels to Anthropic budget_tokens values. */
const THINKING_BUDGETS: Record<string, number> = {
minimal: 1024,
low: 4096,
medium: 10240,
high: 20480,
};
const MODELS = [
{ id: "claude-haiku-4-5@20251001", name: "Claude Haiku 4.5 (Vertex)", contextWindow: 200_000, maxTokens: 64_000 },
{ id: "claude-sonnet-4-5@20250929", name: "Claude Sonnet 4.5 (Vertex)", contextWindow: 200_000, maxTokens: 64_000 },
{ id: "claude-sonnet-4-6@default", name: "Claude Sonnet 4.6 (Vertex)" },
{ id: "claude-opus-4-5@20251101", name: "Claude Opus 4.5 (Vertex)", contextWindow: 200_000, maxTokens: 64_000 },
{ id: "claude-opus-4-6@default", name: "Claude Opus 4.6 (Vertex)" },
{ id: "claude-opus-4-7@default", name: "Claude Opus 4.7 (Vertex)" },
{ id: "claude-opus-4-8@default", name: "Claude Opus 4.8 (Vertex)" },
];
/**
* ADC access token cache. GCP tokens live ~60 min; we refresh at 45 min to
* avoid mid-request expiry. The token is fetched asynchronously via `gcloud`
* to avoid blocking the event loop.
*/
const TOKEN_TTL_MS = 45 * 60 * 1000;
let cachedToken: { value: string; expiresAt: number } | undefined;
async function getAccessToken(): Promise<string> {
if (cachedToken && Date.now() < cachedToken.expiresAt) {
return cachedToken.value;
}
const value = await new Promise<string>((resolve, reject) => {
execFile(
"gcloud",
["auth", "application-default", "print-access-token"],
{ encoding: "utf8" },
(err, stdout) => (err ? reject(err) : resolve(stdout.trim())),
);
});
cachedToken = { value, expiresAt: Date.now() + TOKEN_TTL_MS };
return value;
}
export default function (pi: ExtensionAPI) {
pi.registerProvider("google-vertex-anthropic", {
name: "Google Vertex Anthropic",
api: "vertex-anthropic" as any,
baseUrl: "https://aiplatform.googleapis.com",
apiKey: "vertex-adc",
streamSimple: streamVertexAnthropic,
models: MODELS.map((m) => ({
...MODEL_DEFAULTS,
...m,
input: ["text", "image"] as const,
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 },
})),
});
}
/**
* Custom streaming implementation for the Vertex Anthropic endpoint.
*
* Uses the streamRawPredict endpoint which accepts a raw Anthropic Messages API
* body and returns SSE events in the same format as the direct Anthropic API.
* Vertex may also return a single JSON response (non-streaming) for some error
* cases, so we detect content-type and handle both paths.
*
* The async work runs in a detached IIFE so the stream object can be returned
* synchronously — this is the pattern all pi providers follow.
*/
function streamVertexAnthropic(
model: Model<any>,
context: Context,
options: SimpleStreamOptions = {},
) {
const stream = createAssistantMessageEventStream();
const output: AssistantMessage = {
role: "assistant",
content: [],
api: model.api,
provider: model.provider,
model: model.id,
usage: {
input: 0,
output: 0,
cacheRead: 0,
cacheWrite: 0,
totalTokens: 0,
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
},
stopReason: "stop",
timestamp: Date.now(),
};
(async () => {
try {
const project = process.env.GOOGLE_CLOUD_PROJECT || process.env.GCLOUD_PROJECT;
const location = process.env.VERTEX_ANTHROPIC_LOCATION || process.env.GOOGLE_CLOUD_LOCATION;
if (!project || !location) {
throw new Error(
"Set GOOGLE_CLOUD_PROJECT (or GCLOUD_PROJECT) and VERTEX_ANTHROPIC_LOCATION (or GOOGLE_CLOUD_LOCATION)",
);
}
let body: any = {
anthropic_version: "vertex-2023-10-16",
max_tokens: options.maxTokens ?? model.maxTokens,
messages: toAnthropicMessages(context),
...(context.systemPrompt ? { system: context.systemPrompt } : {}),
...(context.tools?.length
? {
tools: context.tools.map((t) => ({
name: t.name,
description: t.description,
input_schema: t.parameters,
})),
}
: {}),
};
if (options.reasoning && model.reasoning) {
const customBudget =
options.thinkingBudgets?.[options.reasoning as keyof typeof options.thinkingBudgets];
body.thinking = {
type: "enabled",
budget_tokens: customBudget ?? THINKING_BUDGETS[options.reasoning] ?? 10240,
};
}
if (options.temperature !== undefined) body.temperature = options.temperature;
if (options.onPayload) body = (await options.onPayload(body, model)) ?? body;
const host =
location === "global"
? "aiplatform.googleapis.com"
: `${location}-aiplatform.googleapis.com`;
const url = `https://${host}/v1/projects/${project}/locations/${location}/publishers/anthropic/models/${model.id}:streamRawPredict`;
const accessToken = await getAccessToken();
const response = await fetch(url, {
method: "POST",
headers: {
authorization: `Bearer ${accessToken}`,
"content-type": "application/json",
...options.headers,
},
body: JSON.stringify(body),
signal: options.signal,
});
await options.onResponse?.(
{ status: response.status, headers: Object.fromEntries(response.headers.entries()) },
model,
);
if (!response.ok) {
throw new Error(`${response.status} ${response.statusText}: ${await response.text()}`);
}
if (!response.body) {
throw new Error("Vertex returned no response body");
}
stream.push({ type: "start", partial: output });
if (response.headers.get("content-type")?.includes("application/json")) {
handleJsonResponse(stream, output, await response.json());
} else {
await parseSse(response.body, (event) => handleEvent(stream, output, event));
}
calculateCost(model, output.usage);
stream.push({
type: "done",
reason: output.stopReason as "stop" | "length" | "toolUse",
message: output,
});
stream.end();
} catch (error) {
output.stopReason = options.signal?.aborted ? "aborted" : "error";
output.errorMessage = error instanceof Error ? error.message : String(error);
stream.push({ type: "error", reason: output.stopReason, error: output });
stream.end();
}
})();
return stream;
}
/**
* Converts pi's internal message format to Anthropic Messages API format.
*
* Key mappings:
* - user messages pass through (string or content array)
* - assistant messages include thinking blocks (with signatures for replay)
* - toolResult messages become user messages with tool_result content blocks
* (Anthropic requires tool results inside user turns)
*/
function toAnthropicMessages(context: Context) {
const messages: any[] = [];
for (const m of context.messages) {
if (m.role === "user") {
messages.push({
role: "user",
content: typeof m.content === "string" ? m.content : m.content.map(contentPart),
});
}
if (m.role === "assistant") {
messages.push({
role: "assistant",
content: m.content.map(contentPart),
});
}
if (m.role === "toolResult") {
messages.push({
role: "user",
content: [
{
type: "tool_result",
tool_use_id: m.toolCallId,
content: m.content.map((c) => (c.type === "text" ? c.text : "[image]")).join("\n"),
is_error: m.isError,
},
],
});
}
}
return messages;
}
/**
* Maps a single pi content block to its Anthropic equivalent.
*
* Thinking blocks with a signature are sent as-is for faithful replay.
* Thinking blocks without a signature (e.g. from older sessions) get an
* empty string — Vertex accepts this without error.
*/
function contentPart(c: any) {
if (c.type === "text") return { type: "text", text: c.text };
if (c.type === "image") {
return {
type: "image",
source: { type: "base64", media_type: c.mimeType, data: c.data },
};
}
if (c.type === "toolCall") return { type: "tool_use", id: c.id, name: c.name, input: c.arguments };
if (c.type === "thinking") {
return { type: "thinking", thinking: c.thinking, signature: c.thinkingSignature ?? "" };
}
return { type: "text", text: "" };
}
/**
* Minimal SSE parser for Vertex's streaming response. Buffers chunks until a
* double-newline boundary, extracts `data:` lines, and parses the JSON payload.
* Ignores `[DONE]` sentinel.
*/
async function parseSse(
body: ReadableStream<Uint8Array>,
onEvent: (event: any) => void,
) {
const reader = body.getReader();
const decoder = new TextDecoder();
let buffer = "";
for (;;) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
let idx;
while ((idx = buffer.indexOf("\n\n")) >= 0) {
const raw = buffer.slice(0, idx);
buffer = buffer.slice(idx + 2);
const data = raw
.split("\n")
.filter((l) => l.startsWith("data:"))
.map((l) => l.slice(5).trim())
.join("\n");
if (data && data !== "[DONE]") onEvent(JSON.parse(data));
}
}
}
/**
* Merges Anthropic usage fields into pi's usage structure. Called from both
* the JSON response handler and the SSE event handler so cache token counts
* (cache_read_input_tokens, cache_creation_input_tokens) are captured
* regardless of response format.
*/
function extractUsage(output: AssistantMessage, usage: any) {
if (!usage) return;
output.usage.input = usage.input_tokens ?? output.usage.input;
output.usage.output = usage.output_tokens ?? output.usage.output;
output.usage.cacheRead = usage.cache_read_input_tokens ?? output.usage.cacheRead;
output.usage.cacheWrite = usage.cache_creation_input_tokens ?? output.usage.cacheWrite;
output.usage.totalTokens =
output.usage.input + output.usage.output + output.usage.cacheRead + output.usage.cacheWrite;
}
/**
* Handles a non-streaming JSON response. Vertex occasionally returns a full
* message as JSON (e.g. when the response is very short or on certain error
* paths). We emit the same start/delta/end event sequence that the SSE path
* produces so downstream consumers see a consistent stream.
*/
function handleJsonResponse(
stream: AssistantMessageEventStream,
output: AssistantMessage,
message: any,
) {
extractUsage(output, message.usage);
for (const part of message.content ?? []) {
if (part.type === "text") {
const index = output.content.push({ type: "text", text: part.text ?? "" }) - 1;
stream.push({ type: "text_start", contentIndex: index, partial: output });
if (part.text) {
stream.push({ type: "text_delta", contentIndex: index, delta: part.text, partial: output });
}
stream.push({ type: "text_end", contentIndex: index, content: part.text ?? "", partial: output });
}
if (part.type === "thinking") {
const index =
output.content.push({
type: "thinking",
thinking: part.thinking ?? "",
thinkingSignature: part.signature ?? "",
}) - 1;
stream.push({ type: "thinking_start", contentIndex: index, partial: output });
if (part.thinking) {
stream.push({ type: "thinking_delta", contentIndex: index, delta: part.thinking, partial: output });
}
stream.push({ type: "thinking_end", contentIndex: index, content: part.thinking ?? "", partial: output });
}
if (part.type === "tool_use") {
const toolCall = {
type: "toolCall" as const,
id: part.id,
name: part.name,
arguments: part.input ?? {},
};
const index = output.content.push(toolCall) - 1;
stream.push({ type: "toolcall_start", contentIndex: index, partial: output });
stream.push({
type: "toolcall_delta",
contentIndex: index,
delta: JSON.stringify(toolCall.arguments),
partial: output,
});
stream.push({ type: "toolcall_end", contentIndex: index, toolCall, partial: output });
}
}
output.stopReason =
message.stop_reason === "max_tokens"
? "length"
: output.content.some((b) => b.type === "toolCall")
? "toolUse"
: "stop";
}
/**
* Processes a single SSE event from the Anthropic streaming response.
*
* Event lifecycle per content block:
* content_block_start → content_block_delta (repeated) → content_block_stop
*
* Tool call arguments arrive as partial JSON fragments. We accumulate them in a
* temporary `_json` scratch field on the block, attempt to parse after each
* delta, and clean up the scratch field on block stop.
*
* Usage arrives in message_start (input tokens) and message_delta (output
* tokens, cache tokens). Both are routed through extractUsage().
*/
function handleEvent(
stream: AssistantMessageEventStream,
output: AssistantMessage,
event: any,
) {
// -- usage --
if (event.type === "message_start" && event.message?.usage) {
extractUsage(output, event.message.usage);
}
if (event.type === "message_delta" && event.usage) {
extractUsage(output, event.usage);
}
// -- content block start --
if (event.type === "content_block_start") {
const b = event.content_block;
if (b.type === "text") {
output.content.push({ type: "text", text: b.text ?? "" });
stream.push({ type: "text_start", contentIndex: output.content.length - 1, partial: output });
}
if (b.type === "thinking") {
output.content.push({
type: "thinking",
thinking: "",
thinkingSignature: "",
});
stream.push({ type: "thinking_start", contentIndex: output.content.length - 1, partial: output });
}
if (b.type === "tool_use") {
output.content.push({ type: "toolCall", id: b.id, name: b.name, arguments: b.input ?? {} });
stream.push({ type: "toolcall_start", contentIndex: output.content.length - 1, partial: output });
}
}
// -- content block delta --
if (event.type === "content_block_delta") {
const block = output.content[event.index];
if (!block) return;
if (block.type === "text" && event.delta?.text) {
block.text += event.delta.text;
stream.push({ type: "text_delta", contentIndex: event.index, delta: event.delta.text, partial: output });
}
if (block.type === "thinking" && event.delta?.thinking) {
block.thinking += event.delta.thinking;
stream.push({ type: "thinking_delta", contentIndex: event.index, delta: event.delta.thinking, partial: output });
}
if (block.type === "thinking" && event.delta?.type === "signature_delta") {
block.thinkingSignature = (block.thinkingSignature || "") + (event.delta.signature ?? "");
}
if (block.type === "toolCall" && event.delta?.partial_json) {
const scratch = ((block as any)._json ?? "") + event.delta.partial_json;
(block as any)._json = scratch;
try {
(block as ToolCall).arguments = JSON.parse(scratch || "{}");
} catch {}
stream.push({
type: "toolcall_delta",
contentIndex: event.index,
delta: event.delta.partial_json,
partial: output,
});
}
}
// -- content block stop --
if (event.type === "content_block_stop") {
const block = output.content[event.index];
if (!block) return;
if (block.type === "text") {
stream.push({ type: "text_end", contentIndex: event.index, content: block.text, partial: output });
}
if (block.type === "thinking") {
stream.push({ type: "thinking_end", contentIndex: event.index, content: block.thinking, partial: output });
}
if (block.type === "toolCall") {
// Final parse attempt + clean up scratch field
try {
(block as ToolCall).arguments = JSON.parse((block as any)._json || "{}");
} catch {
// keep whatever we parsed so far
}
delete (block as any)._json;
stream.push({ type: "toolcall_end", contentIndex: event.index, toolCall: block, partial: output });
}
}
// -- stop reason --
if (event.type === "message_delta" && event.delta?.stop_reason === "max_tokens") {
output.stopReason = "length";
}
if (event.type === "message_stop" && output.stopReason !== "length") {
output.stopReason = output.content.some((b) => b.type === "toolCall") ? "toolUse" : "stop";
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment