Skip to content

Instantly share code, notes, and snippets.

@iainlane
Created June 6, 2025 09:56
Show Gist options
  • Save iainlane/29528f83ebcdc420fb27345dc11aa1f6 to your computer and use it in GitHub Desktop.
Save iainlane/29528f83ebcdc420fb27345dc11aa1f6 to your computer and use it in GitHub Desktop.
Reworking of the anthropic manifold pipeline for open-webui, to support fetching models dynamically, thinking budgets and streaming/not-streaming.
"""
title: Anthropic Manifold Pipe
authors: justinh-rahb and christian-taillon. significantly refactored by iainlane
author_url: https://github.com/justinh-rahb
funding_url: https://github.com/open-webui
version: 2.0
required_open_webui_version: 0.3.17
license: MIT
"""
import requests
import json
import time
import logging
from urllib.parse import urlparse
from uuid import uuid4
from typing import (
List,
Literal,
Union,
Generator,
Optional,
Tuple,
Dict,
Any,
)
from pydantic import BaseModel, Field
from open_webui.utils.misc import pop_system_message
# Constants and defaults
ANTHROPIC_VERSION: str = "2023-06-01"
BASE64_SIZE_MULTIPLIER: float = 3.0 / 4.0 # Base64 encoding size calculation
DEFAULT_LOG_LEVEL: str = "INFO"
DEFAULT_MAX_TOKENS: int = 16384
DEFAULT_MODEL_CACHE_TTL: int = 3600
DEFAULT_THINKING_BUDGET: int = 4096
MODELS_REQUEST_TIMEOUT: int = 10 # Timeout for model fetching requests
THINKING_BUFFER_TOKENS: int = 1024 # Reserve tokens for response when thinking enabled
# Image processing
MAX_IMAGE_SIZE: int = 5 * 1024 * 1024 # 5MB per image
MAX_TOTAL_IMAGE_SIZE: int = 100 * 1024 * 1024 # 100MB total
# Thinking configuration
THINKING_EFFORT_BUDGETS: Dict[str, int] = {
"low": 2048,
"medium": 4096,
"high": 8192,
}
THINKING_MODEL_PREFIXES: List[str] = [
"claude-opus-4-",
"claude-sonnet-4-",
"claude-3-7-sonnet-",
]
# API configuration
ANTHROPIC_API_BASE: str = "https://api.anthropic.com/v1"
REQUEST_TIMEOUT: Tuple[float, float] = (3.05, 60) # (connect, read)
STREAM_DELAY: float = 0.01 # Small delay between chunks
# Logging setup
logger = logging.getLogger(__name__)
logger.setLevel(DEFAULT_LOG_LEVEL)
# Type definitions
MessageContent = Union[str, List[Dict[str, Any]]]
StreamChunk = Dict[str, Any]
ThinkingBlock = Dict[str, Any]
APIResponse = Dict[str, Any]
ProcessedMessage = Dict[str, Any]
class Pipe:
class Valves(BaseModel):
ANTHROPIC_API_KEY: str = Field(default="")
MODEL_CACHE_TTL: int = Field(default=DEFAULT_MODEL_CACHE_TTL)
ANTHROPIC_VERSION: str = Field(default=ANTHROPIC_VERSION)
DEFAULT_MAX_TOKENS: int = Field(default=DEFAULT_MAX_TOKENS)
ENABLE_THINKING_BY_DEFAULT: bool = Field(default=True)
DEFAULT_THINKING_BUDGET: int = Field(default=DEFAULT_THINKING_BUDGET)
DEFAULT_STREAM: bool = Field(default=True)
LOG_LEVEL: Literal["DEBUG", "INFO", "WARNING", "ERROR"] = Field(
default=DEFAULT_LOG_LEVEL
)
def __init__(self) -> None:
logger.debug("Initialising Anthropic Pipe")
self.type: str = "manifold"
self.id: str = "anthropic"
self.name: str = "Anthropic: "
self.valves: Pipe.Valves = self.Valves()
# Cache for models
self._cached_models: Optional[List[Dict[str, str]]] = None
self._cache_timestamp: Optional[float] = None
logger.info("Anthropic Pipe ready")
def _validate_valves(self) -> "Pipe.Valves":
"""Validate valve values and apply corrections if needed."""
def _safe_int(value: int, min_value: int, name: str, default: int) -> int:
if value < min_value:
logger.warning(
f"Invalid {name} value '{value}', using default {default}"
)
return default
return value
self.valves.MODEL_CACHE_TTL = _safe_int(
self.valves.MODEL_CACHE_TTL, 1, "MODEL_CACHE_TTL", DEFAULT_MODEL_CACHE_TTL
)
self.valves.DEFAULT_MAX_TOKENS = _safe_int(
self.valves.DEFAULT_MAX_TOKENS,
1,
"DEFAULT_MAX_TOKENS",
DEFAULT_MAX_TOKENS,
)
self.valves.DEFAULT_THINKING_BUDGET = _safe_int(
self.valves.DEFAULT_THINKING_BUDGET,
0,
"DEFAULT_THINKING_BUDGET",
DEFAULT_THINKING_BUDGET,
)
logger.setLevel(getattr(logging, self.valves.LOG_LEVEL))
return self.valves
def _is_cache_valid(self) -> bool:
"""Check if the cached models are still valid."""
if not self._cached_models or not self._cache_timestamp:
return False
age = time.time() - self._cache_timestamp
is_valid = age < self.valves.MODEL_CACHE_TTL
if not is_valid:
logger.info(
f"Model cache expired (age: {age:.1f}s > TTL: {self.valves.MODEL_CACHE_TTL}s)"
)
return is_valid
def _make_api_request(self) -> requests.Response:
"""Make the API request to fetch models."""
headers = {
"x-api-key": self.valves.ANTHROPIC_API_KEY,
"anthropic-version": self.valves.ANTHROPIC_VERSION,
"content-type": "application/json",
}
url = f"{ANTHROPIC_API_BASE}/models"
logger.info(f"Fetching models from: {url}")
response = requests.get(url, headers=headers, timeout=MODELS_REQUEST_TIMEOUT)
logger.info(f"Models API responded: HTTP {response.status_code}")
return response
def _process_api_models(self, data: Dict[str, Any]) -> List[Dict[str, str]]:
"""Process the models returned by the API."""
raw_models = data.get("data", [])
models = []
thinking_count = 0
for model in raw_models:
model_id = model.get("id", "")
display_name = model.get("display_name", model_id)
if self._supports_thinking(model_id):
display_name += " (Thinking)"
thinking_count += 1
models.append({"id": model_id, "name": display_name})
sorted_models = sorted(models, key=lambda x: x["name"])
logger.info(
f"Processed {len(sorted_models)} models ({thinking_count} with thinking support)"
)
return sorted_models
def _supports_thinking(self, model_id: str) -> bool:
"""Check if a model supports thinking."""
return any(model_id.startswith(prefix) for prefix in THINKING_MODEL_PREFIXES)
def _fetch_models_from_api(self) -> List[Dict[str, str]]:
"""Fetch available models from the Anthropic API."""
if not self.valves.ANTHROPIC_API_KEY:
logger.error("Cannot fetch models: No API key provided")
return []
try:
response = self._make_api_request()
if response.status_code != 200:
logger.error(f"API request failed: HTTP {response.status_code}")
if response.text:
logger.debug(f"Error response: {response.text[:500]}")
return []
data = response.json()
models = self._process_api_models(data)
logger.info(f"Successfully fetched {len(models)} models from API")
return models
except requests.exceptions.RequestException as e:
logger.error(f"Network error fetching models: {e}")
return []
except json.JSONDecodeError as e:
logger.error(f"Invalid JSON response from models API: {e}")
return []
except Exception as e:
logger.error(f"Unexpected error fetching models: {e}")
return []
def get_anthropic_models(self) -> List[Dict[str, str]]:
"""Get available Anthropic models, using cache if valid."""
if self._is_cache_valid():
return self._cached_models # type: ignore
models = self._fetch_models_from_api()
# Update cache
self._cached_models = models
self._cache_timestamp = time.time()
if not models:
logger.warning("No models available - cache updated with empty list")
return []
logger.info(f"Cached {len(models)} models")
return models
def pipes(self) -> List[Dict[str, str]]:
"""Required method for Open WebUI pipe interface."""
self._validate_valves()
return self.get_anthropic_models()
def _convert_usage_to_openai(
self,
usage_data: Dict[str, Any],
model_id: str,
thinking_blocks: Optional[List[ThinkingBlock]] = None,
) -> Dict[str, Any]:
"""Convert Anthropic usage format to OpenAI-compatible format with enhanced metadata."""
input_tokens = usage_data.get("input_tokens", 0)
output_tokens = usage_data.get("output_tokens", 0)
cache_creation = usage_data.get("cache_creation_input_tokens", 0)
cache_read = usage_data.get("cache_read_input_tokens", 0)
# Calculate reasoning tokens from thinking blocks
reasoning_tokens = 0
thinking_chars = 0
if thinking_blocks:
for block in thinking_blocks:
if block.get("type") == "thinking":
thinking_text = block.get("thinking", "")
thinking_chars += len(thinking_text)
# Rough estimation: ~4 chars per token for reasoning content
reasoning_tokens += len(thinking_text) // 4
# Calculate cache efficiency
cache_hit_rate = 0.0
total_cache_tokens = cache_creation + cache_read
if input_tokens > 0:
cache_hit_rate = round((cache_read / input_tokens) * 100, 2)
# Enhanced OpenAI-compatible usage with additional metadata
usage = {
"prompt_tokens": input_tokens,
"completion_tokens": output_tokens,
"total_tokens": input_tokens + output_tokens,
"completion_tokens_details": {
"reasoning_tokens": reasoning_tokens,
"accepted_prediction_tokens": 0,
"rejected_prediction_tokens": 0,
},
"cache_creation_input_tokens": cache_creation,
"cache_read_input_tokens": cache_read,
"model": model_id,
# Additional metadata (similar to Ollama format)
"anthropic_metadata": {
"thinking_blocks": len(thinking_blocks) if thinking_blocks else 0,
"thinking_characters": thinking_chars,
"cache_hit_rate_percent": cache_hit_rate,
"cache_efficiency": "high"
if cache_hit_rate > 50
else "medium"
if cache_hit_rate > 20
else "low",
"total_cache_tokens": total_cache_tokens,
},
}
return usage
def _process_content_blocks(
self, content_blocks: List[Dict[str, Any]]
) -> Tuple[str, str, List[ThinkingBlock]]:
"""Process content blocks and extract text, reasoning, and thinking blocks."""
content = ""
reasoning_parts = []
thinking_blocks: List[ThinkingBlock] = []
for i, block in enumerate(content_blocks):
block_type = block.get("type", "unknown")
match block_type:
case "text":
text = block.get("text", "")
content += text
logger.debug(
f"Added text content from block {i}: {len(text)} chars"
)
case "thinking":
thinking_text = block.get("thinking", block.get("text", ""))
signature = block.get("signature")
thinking_block: ThinkingBlock = {
"type": "thinking",
"thinking": thinking_text,
}
if signature:
thinking_block["signature"] = signature
thinking_blocks.append(thinking_block)
reasoning_parts.append(thinking_text)
logger.debug(
f"Added thinking block {i}: {len(thinking_text)} chars"
)
case "redacted_thinking":
redacted_data = block.get("data", "")
thinking_blocks.append(
{"type": "redacted_thinking", "data": redacted_data}
)
reasoning_parts.append(
"[Some reasoning content was redacted for safety]"
)
logger.debug(f"Added redacted thinking block {i}")
# Combine all reasoning parts
reasoning_content = "\n\n".join(reasoning_parts) if reasoning_parts else ""
if thinking_blocks:
logger.info(
f"Processed {len(thinking_blocks)} thinking blocks with {len(reasoning_content)} chars total reasoning"
)
return content, reasoning_content, thinking_blocks
def _format_reasoning_for_nonstream_ui(
self, reasoning_content: str, thinking_blocks: List[ThinkingBlock]
) -> str:
"""Format reasoning content using details tags for non-streaming UI compatibility."""
if not reasoning_content:
return ""
# Format with quote markers (matching streaming behavior)
reasoning_display = "\n".join(
f"> {line}" if not line.startswith(">") else line
for line in reasoning_content.splitlines()
)
# Check for redacted content
has_redacted = any(
block.get("type") == "redacted_thinking" for block in thinking_blocks
)
# Format using the same details structure as streaming (without duration)
summary_text = "Thinking…" if has_redacted else "Thinking…"
if has_redacted:
summary_text += " (some content redacted for safety)"
details_html = (
f'<details type="reasoning" done="true">\n'
f"<summary>{summary_text}</summary>\n"
f"{reasoning_display}\n"
f"</details>\n\n"
)
return details_html
def convert_anthropic_response_to_openai(
self, anthropic_response: Dict[str, Any], model_id: str
) -> APIResponse:
"""Convert Anthropic response format to OpenAI-compatible format."""
content_blocks = anthropic_response.get("content", [])
content, reasoning_content, thinking_blocks = self._process_content_blocks(
content_blocks
)
# Add formatted reasoning to content using details tags for non-streaming UI
if reasoning_content:
formatted_reasoning = self._format_reasoning_for_nonstream_ui(
reasoning_content, thinking_blocks
)
content = formatted_reasoning + content
# Convert usage with thinking blocks for accurate reasoning token count
usage = self._convert_usage_to_openai(
anthropic_response.get("usage", {}), model_id, thinking_blocks
)
# Build OpenAI response
stop_reason = anthropic_response.get("stop_reason", "stop")
openai_response: APIResponse = {
"id": f"chatcmpl-{uuid4()}",
"object": "chat.completion",
"created": int(time.time()),
"model": model_id,
"choices": [
{
"index": 0,
"message": {"role": "assistant", "content": content},
"finish_reason": self._map_stop_reason(stop_reason),
}
],
"usage": usage,
}
# Preserve thinking blocks for continuity
if thinking_blocks:
openai_response["anthropic_thinking_blocks"] = thinking_blocks
return openai_response
def convert_anthropic_stream_to_openai(
self,
chunk_data: Dict[str, Any],
model_id: str,
thinking_blocks: Optional[List[ThinkingBlock]] = None,
) -> StreamChunk:
"""Convert Anthropic streaming chunk to OpenAI-compatible format."""
delta: Dict[str, Any] = {}
finish_reason: Optional[str] = None
usage: Optional[Dict[str, Any]] = None
chunk_type = chunk_data.get("type")
match chunk_type:
case "message_start":
delta = {"role": "assistant", "content": ""}
case "content_block_start":
content_block = chunk_data.get("content_block", {})
if content_block.get("type") == "text":
text = content_block.get("text", "")
delta = {"content": text}
case "content_block_delta":
delta_content = chunk_data.get("delta", {})
delta_type = delta_content.get("type")
match delta_type:
case "text_delta":
text = delta_content.get("text", "")
delta = {"content": text}
case "message_delta":
usage_data = chunk_data.get("usage", {})
if usage_data:
usage = self._convert_usage_to_openai(
usage_data, model_id, thinking_blocks
)
case "message_stop":
finish_reason = "stop"
response: StreamChunk = {
"id": f"chatcmpl-{uuid4()}",
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": model_id,
"choices": [{"index": 0, "delta": delta, "finish_reason": finish_reason}],
}
if usage:
response["usage"] = usage
return response
def _format_api_error(self, status_code: int, error_text: str) -> str:
"""Format API errors to be more user-friendly."""
try:
error_data = json.loads(error_text)
except (json.JSONDecodeError, KeyError):
# Fall back to generic error if we can't parse JSON
return f"HTTP Error {status_code}: {error_text[:200]}{'...' if len(error_text) > 200 else ''}"
if "error" not in error_data or "message" not in error_data["error"]:
return f"HTTP Error {status_code}: {error_text[:200]}{'...' if len(error_text) > 200 else ''}"
message = error_data["error"]["message"]
error_type = error_data.get("error", {}).get("type", "")
match error_type:
case "authentication_error":
return "Authentication failed. Please check your API key."
case "rate_limit_error":
return "Rate limit exceeded. Please try again later."
case "invalid_request_error":
return f"Request error: {message}"
case _:
return f"API Error: {message}"
def _map_stop_reason(self, anthropic_reason: str) -> str:
"""Map Anthropic stop reasons to OpenAI format."""
mapping = {
"end_turn": "stop",
"max_tokens": "length",
"stop_sequence": "stop",
"tool_use": "tool_calls",
}
return mapping.get(anthropic_reason, "stop")
def _validate_image_url(self, url: str) -> bool:
"""Validate image URL for security."""
try:
parsed = urlparse(url)
if parsed.scheme not in ("http", "https"):
logger.warning(
f"Rejected image URL with invalid scheme: {parsed.scheme}"
)
return False
# Block common internal/private addresses
hostname = parsed.hostname
if hostname:
hostname = hostname.lower()
# Block localhost, private IPs, and common internal hostnames
blocked_hosts = {
"localhost",
"127.0.0.1",
"0.0.0.0",
"::1",
"metadata.google.internal",
"instance-data",
"169.254.169.254",
}
if hostname in blocked_hosts:
logger.warning(
f"Rejected image URL with blocked hostname: {hostname}"
)
return False
if hostname.startswith(("10.", "192.168.", "172.")):
logger.warning(f"Rejected image URL with private IP: {hostname}")
return False
return True
except Exception as e:
logger.warning(f"Failed to parse image URL: {e}")
return False
def _validate_image_size(self, image_data: Dict[str, Any]) -> Dict[str, Any]:
"""Process and validate image data for INPUT to Claude.
Note: Claude cannot generate images - this code only handles images
that users upload for Claude to analyze. We validate size and security
constraints before sending to Anthropic's API.
"""
url = image_data["image_url"]["url"]
if url.startswith("data:image"):
return self._process_base64_image(url)
return self._process_remote_image(url)
def _process_base64_image(self, url: str) -> Dict[str, Any]:
"""Process and validate base64 image data for Claude analysis.
Claude accepts images as input for analysis but cannot generate images.
This validates size limits and converts to Anthropic's format.
"""
try:
mime_type, base64_data = url.split(",", 1)
media_type = mime_type.split(":")[1].split(";")[0]
image_size = len(base64_data) * BASE64_SIZE_MULTIPLIER
size_mb = image_size / (1024 * 1024)
logger.debug(f"Processing base64 image: {size_mb:.2f}MB")
if image_size > MAX_IMAGE_SIZE:
error_msg = f"Base64 image exceeds 5MB limit: {size_mb:.2f}MB"
logger.error(error_msg)
raise ValueError(error_msg)
return {
"type": "image",
"source": {
"type": "base64",
"media_type": media_type,
"data": base64_data,
},
}
except (ValueError, IndexError) as e:
error_msg = f"Invalid base64 image data: {e}"
logger.error(error_msg)
raise ValueError(error_msg)
def _process_remote_image(self, url: str) -> Dict[str, Any]:
"""Process and validate remote image URL for Claude analysis.
Claude accepts images as input for analysis but cannot generate images.
This validates the URL for security and size constraints.
"""
# Validate URL for security (prevent SSRF attacks)
if not self._validate_image_url(url):
error_msg = "Image URL rejected for security reasons"
logger.error(error_msg)
raise ValueError(error_msg)
try:
logger.debug(f"Checking remote image size: {url}")
response = requests.head(url, allow_redirects=True, timeout=10)
response.raise_for_status()
content_length = int(response.headers.get("content-length", 0))
size_mb = content_length / (1024 * 1024)
logger.debug(f"Remote image size: {size_mb:.2f}MB")
if content_length > MAX_IMAGE_SIZE:
error_msg = f"Remote image exceeds 5MB limit: {size_mb:.2f}MB"
logger.error(error_msg)
raise ValueError(error_msg)
return {"type": "image", "source": {"type": "url", "url": url}}
except requests.exceptions.RequestException as e:
error_msg = f"Failed to validate remote image: {e}"
logger.error(error_msg)
raise ValueError(error_msg)
def _process_messages(
self, messages: List[Dict[str, Any]]
) -> Tuple[List[ProcessedMessage], int]:
"""Process messages and return processed messages with total image size.
Handles text and image content from users. Images are only _input_ for
Claude to analyse - Claude cannot generate or return images, only text
responses.
"""
logger.debug(f"Processing {len(messages)} messages")
processed_messages: List[ProcessedMessage] = []
total_image_size = 0
for i, message in enumerate(messages):
logger.debug(
f"Processing message {i}: role={message.get('role', 'unknown')}"
)
processed_content = []
content = message.get("content", "")
if not isinstance(content, list):
# Simple text content
processed_content = [{"type": "text", "text": str(content)}]
else:
for _, item in enumerate(content):
item_type = item.get("type", "unknown")
match item_type:
case "text":
text = item.get("text", "")
processed_content.append({"type": "text", "text": text})
case "image_url":
processed_image = self._validate_image_size(item)
processed_content.append(processed_image)
# Track total image size for base64 images
if processed_image["source"]["type"] != "base64":
continue
image_size = (
len(processed_image["source"]["data"])
* BASE64_SIZE_MULTIPLIER
)
total_image_size += image_size
if total_image_size > MAX_TOTAL_IMAGE_SIZE:
size_mb = total_image_size / (1024 * 1024)
error_msg = f"Total image size exceeds 100MB limit: {size_mb:.2f}MB"
logger.error(error_msg)
raise ValueError(error_msg)
logger.debug(
f"Total image size: {total_image_size / (1024 * 1024):.2f}MB"
)
processed_message: ProcessedMessage = {
"role": message["role"],
"content": processed_content,
}
processed_messages.append(processed_message)
if total_image_size > 0:
logger.info(
f"Processed {len(processed_messages)} messages with {total_image_size / (1024 * 1024):.2f}MB total images"
)
else:
logger.debug(f"Processed {len(processed_messages)} messages (no images)")
return processed_messages, int(total_image_size)
def _configure_thinking(
self, body: Dict[str, Any], model_id: str, payload: Dict[str, Any]
) -> bool:
"""Configure thinking parameters and return whether thinking is enabled."""
supports_thinking = self._supports_thinking(model_id)
if not supports_thinking:
return False
if not self.valves.ENABLE_THINKING_BY_DEFAULT:
return False
thinking_config = body.get("thinking", True)
if thinking_config is False:
logger.info("Thinking disabled by user request")
return False
thinking_enabled = True
if isinstance(thinking_config, dict):
payload["thinking"] = thinking_config
logger.info(f"Applied custom thinking config: {thinking_config}")
return thinking_enabled
if thinking_config not in (True, None):
return thinking_enabled
budget_tokens = body.get("thinking_budget", self.valves.DEFAULT_THINKING_BUDGET)
max_budget = max(0, payload["max_tokens"] - THINKING_BUFFER_TOKENS)
final_budget = max(0, min(budget_tokens, max_budget))
if final_budget == 0:
logger.warning(
f"Thinking budget is 0 (max_tokens: {payload['max_tokens']}, buffer: {THINKING_BUFFER_TOKENS})"
)
payload["thinking"] = {
"type": "enabled",
"budget_tokens": final_budget,
}
logger.info(
f"Configured thinking: budget={final_budget} tokens (requested={budget_tokens}, max_available={max_budget})"
)
# Handle reasoning_effort parameter
reasoning_effort = body.get("reasoning_effort")
if not reasoning_effort:
if thinking_enabled:
logger.info(f"Thinking enabled for model {model_id}")
return thinking_enabled
budget = THINKING_EFFORT_BUDGETS.get(
reasoning_effort, THINKING_EFFORT_BUDGETS["medium"]
)
max_budget = max(0, payload["max_tokens"] - THINKING_BUFFER_TOKENS)
final_budget = max(0, min(budget, max_budget))
if final_budget == 0:
logger.warning(
f"Thinking budget from reasoning_effort is 0 (effort: {reasoning_effort})"
)
if "thinking" not in payload:
payload["thinking"] = {
"type": "enabled",
"budget_tokens": final_budget,
}
else:
payload["thinking"]["budget_tokens"] = final_budget
logger.info(
f"Applied reasoning_effort '{reasoning_effort}': thinking budget set to {final_budget} tokens"
)
return thinking_enabled
def _apply_thinking_constraints(self, payload: Dict[str, Any]) -> None:
"""Apply parameter constraints when thinking is enabled."""
logger.debug("Applying thinking parameter constraints")
# Temperature must be 1.0
original_temp = payload.get("temperature")
if original_temp != 1.0:
payload["temperature"] = 1.0
logger.debug(f"Adjusted temperature: {original_temp} -> 1.0")
# Remove top_k
if "top_k" in payload:
removed_top_k = payload.pop("top_k")
logger.debug(f"Removed top_k: {removed_top_k}")
# Constrain top_p to [0.95, 1.0]
if "top_p" in payload:
original_top_p = payload["top_p"]
if not (0.95 <= original_top_p <= 1.0):
new_top_p = max(0.95, min(1.0, original_top_p))
payload["top_p"] = new_top_p
logger.debug(f"Adjusted top_p: {original_top_p} -> {new_top_p}")
logger.debug("Thinking constraints applied")
def _build_payload(
self,
body: Dict[str, Any],
model_id: str,
processed_messages: List[ProcessedMessage],
system_message: Optional[str],
) -> Dict[str, Any]:
"""Build the API payload."""
logger.debug(f"Building payload for model: {model_id}")
# Apply default stream setting if not specified
stream_setting = body.get("stream", self.valves.DEFAULT_STREAM)
logger.debug(
f"Stream setting: {stream_setting} (default: {self.valves.DEFAULT_STREAM})"
)
payload = {
"model": model_id,
"messages": processed_messages,
"max_tokens": body.get("max_tokens", self.valves.DEFAULT_MAX_TOKENS),
"temperature": body.get("temperature", 0.8),
"top_k": body.get("top_k", 40),
"top_p": body.get("top_p", 0.9),
"stop_sequences": body.get("stop", []),
"stream": stream_setting,
}
logger.debug(
f"Base payload: max_tokens={payload['max_tokens']}, "
f"temperature={payload['temperature']}, stream={payload['stream']}"
)
if system_message:
payload["system"] = str(system_message)
logger.debug(f"Added system message: {len(str(system_message))} chars")
# Configure thinking
thinking_enabled = self._configure_thinking(body, model_id, payload)
if thinking_enabled:
logger.debug("Applying thinking constraints")
self._apply_thinking_constraints(payload)
# Handle custom metadata
metadata = body.get("metadata")
if metadata and isinstance(metadata, dict):
anthropic_metadata = {
key.replace("anthropic_", ""): value
for key, value in metadata.items()
if key.startswith("anthropic_")
}
if anthropic_metadata:
payload["metadata"] = anthropic_metadata
logger.debug(f"Added metadata: {anthropic_metadata}")
return payload
def _build_headers(self, model_id: str) -> Dict[str, str]:
"""Build API request headers."""
logger.debug(f"Building headers for model: {model_id}")
headers = {
"x-api-key": self.valves.ANTHROPIC_API_KEY,
"anthropic-version": self.valves.ANTHROPIC_VERSION,
"content-type": "application/json",
}
# Add beta header for Claude 4 interleaved thinking
if self._supports_thinking(model_id) and model_id.startswith(
("claude-opus-4-", "claude-sonnet-4-")
):
headers["anthropic-beta"] = "interleaved-thinking-2025-05-14"
logger.debug("Added interleaved thinking beta header")
return headers
def _handle_thinking_stream_start(
self, data: Dict[str, Any], thinking_blocks: List[ThinkingBlock], model_id: str
) -> Tuple[bool, Optional[str]]:
"""Handle the start of thinking-related stream events."""
if data.get("type") != "content_block_start":
return False, None
content_block = data.get("content_block", {})
block_type = content_block.get("type")
match block_type:
case "thinking":
logger.debug("Started thinking block")
return True, None
case "redacted_thinking":
redacted_data = content_block.get("data", "")
thinking_blocks.append(
{"type": "redacted_thinking", "data": redacted_data}
)
# Emit redacted notice
reasoning_chunk = {
"type": "content_block_delta",
"delta": {
"type": "reasoning_content",
"text": "[Some reasoning content was redacted for safety]",
},
}
openai_chunk = self.convert_anthropic_stream_to_openai(
reasoning_chunk, model_id, thinking_blocks
)
chunk_data = f"data: {json.dumps(openai_chunk)}\n\n"
logger.info("Emitted redacted thinking notice")
return False, chunk_data
case _:
return False, None
def _process_thinking_delta(
self,
delta: Dict[str, Any],
accumulated_reasoning: str,
current_signature: str,
model_id: str,
thinking_blocks: List[ThinkingBlock],
) -> Tuple[str, str, Optional[str]]:
"""Process thinking delta and return updated state and chunk to emit."""
delta_type = delta.get("type")
match delta_type:
case "signature_delta":
sig_chunk = delta.get("signature", "")
current_signature += sig_chunk
return accumulated_reasoning, current_signature, None
case "thinking_delta":
text = delta.get("thinking", "")
case "text_delta":
text = delta.get("text", "")
case _:
text = delta.get("text", "")
if not text:
return accumulated_reasoning, current_signature, None
accumulated_reasoning += text
# Emit reasoning content
reasoning_chunk = {
"type": "content_block_delta",
"delta": {"type": "reasoning_content", "text": text},
}
openai_chunk = self.convert_anthropic_stream_to_openai(
reasoning_chunk, model_id, thinking_blocks
)
chunk_data = f"data: {json.dumps(openai_chunk)}\n\n"
return accumulated_reasoning, current_signature, chunk_data
def _handle_thinking_stream_end(
self,
accumulated_reasoning: str,
current_signature: str,
thinking_blocks: List[ThinkingBlock],
) -> None:
"""Handle the end of a thinking stream block."""
logger.debug("Handling thinking stream end")
logger.debug(f"Thinking complete: {len(accumulated_reasoning)} chars")
thinking_block: ThinkingBlock = {
"type": "thinking",
"thinking": accumulated_reasoning,
}
if current_signature:
thinking_block["signature"] = current_signature
logger.debug(f"Stored signature: {len(current_signature)} chars")
thinking_blocks.append(thinking_block)
logger.debug(f"Total thinking blocks: {len(thinking_blocks)}")
def _process_stream_line(self, line: str) -> Optional[Dict[str, Any]]:
"""Process a single stream line and return parsed data."""
if not line or not line.startswith("data: "):
return None
try:
data = json.loads(line[6:])
logger.debug(f"Parsed stream data: type={data.get('type', 'unknown')}")
return data
except json.JSONDecodeError as e:
logger.error(f"Failed to parse stream JSON: {line}, Error: {e}")
return None
except Exception as e:
logger.error(f"Unexpected error parsing stream line: {e}")
return None
def _emit_final_stream_chunk(
self, data: Dict[str, Any], thinking_blocks: List[ThinkingBlock], model_id: str
) -> str:
"""Emit the final stream chunk with thinking blocks."""
final_chunk = self.convert_anthropic_stream_to_openai(
data, model_id, thinking_blocks
)
if thinking_blocks:
final_chunk["anthropic_thinking_blocks"] = thinking_blocks
logger.debug(f"Added {len(thinking_blocks)} thinking blocks to final chunk")
return f"data: {json.dumps(final_chunk)}\n\n"
def _stream_response(
self, url: str, headers: Dict[str, str], payload: Dict[str, Any]
) -> Generator[str, None, None]:
"""Handle streaming response."""
model_id = payload.get("model", "claude")
logger.info(f"Starting stream response for model: {model_id}")
# Stream state
accumulated_reasoning = ""
current_signature = ""
in_thinking_block = False
thinking_blocks: List[ThinkingBlock] = []
chunk_count = 0
try:
logger.info(f"Starting stream request to: {url}")
with requests.post(
url, headers=headers, json=payload, stream=True, timeout=REQUEST_TIMEOUT
) as response:
if response.status_code != 200:
error_text = response.text
logger.error(f"Stream API error: HTTP {response.status_code}")
if error_text:
logger.debug(f"Error response: {error_text[:500]}")
# Parse and format error for better user experience
user_friendly_error = self._format_api_error(
response.status_code, error_text
)
raise Exception(user_friendly_error)
for line in response.iter_lines():
if not line:
continue
line_str = line.decode("utf-8")
chunk_count += 1
data = self._process_stream_line(line_str)
if not data:
continue
# Handle thinking content
if in_thinking_block:
if data.get("type") == "content_block_delta":
delta = data.get("delta", {})
accumulated_reasoning, current_signature, chunk = (
self._process_thinking_delta(
delta,
accumulated_reasoning,
current_signature,
model_id,
thinking_blocks,
)
)
if chunk:
yield chunk
continue
elif data.get("type") == "content_block_stop":
self._handle_thinking_stream_end(
accumulated_reasoning,
current_signature,
thinking_blocks,
)
in_thinking_block = False
accumulated_reasoning = ""
current_signature = ""
continue
# Handle thinking block start
thinking_started, chunk = self._handle_thinking_stream_start(
data, thinking_blocks, model_id
)
if thinking_started:
in_thinking_block = True
accumulated_reasoning = ""
current_signature = ""
continue
if chunk:
yield chunk
continue
# Handle final message
if data.get("type") == "message_stop":
final_chunk = self._emit_final_stream_chunk(
data, thinking_blocks, model_id
)
yield final_chunk
break
# Convert and emit regular content
if not in_thinking_block:
openai_chunk = self.convert_anthropic_stream_to_openai(
data, model_id, thinking_blocks
)
yield f"data: {json.dumps(openai_chunk)}\n\n"
time.sleep(STREAM_DELAY)
# Log thinking token information (consistent with non-streaming)
if thinking_blocks:
total_thinking_chars = sum(
len(block.get("thinking", ""))
for block in thinking_blocks
if block.get("type") == "thinking"
)
logger.info(
f"Stream response contains {len(thinking_blocks)} thinking blocks ({total_thinking_chars} chars)"
)
logger.info(f"Stream complete: processed {chunk_count} chunks")
yield "data: [DONE]\n\n"
except Exception as e:
logger.error(f"Stream response error: {e}")
error_chunk = {
"id": f"chatcmpl-{uuid4()}",
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": model_id,
"choices": [
{
"index": 0,
"delta": {"content": f"Error: {e}"},
"finish_reason": "stop",
}
],
}
yield f"data: {json.dumps(error_chunk)}\n\n"
yield "data: [DONE]\n\n"
def _non_stream_response(
self, url: str, headers: Dict[str, str], payload: Dict[str, Any]
) -> APIResponse:
"""Handle non-streaming response."""
model_id = payload.get("model", "claude")
try:
logger.info(f"Making non-stream request for model {model_id} to {url}")
response = requests.post(
url, headers=headers, json=payload, timeout=REQUEST_TIMEOUT
)
if response.status_code != 200:
error_text = response.text
logger.error(f"Non-stream API error: HTTP {response.status_code}")
if error_text:
logger.debug(f"Error response: {error_text[:500]}")
# Parse and format error for better user experience
user_friendly_error = self._format_api_error(
response.status_code, error_text
)
raise Exception(user_friendly_error)
anthropic_response = response.json()
# Log content analysis
content_blocks = anthropic_response.get("content", [])
if content_blocks:
thinking_blocks = [
b for b in content_blocks if b.get("type") == "thinking"
]
if thinking_blocks:
total_thinking_chars = sum(
len(b.get("thinking", "")) for b in thinking_blocks
)
logger.info(
f"Response contains {len(thinking_blocks)} thinking blocks ({total_thinking_chars} chars)"
)
converted_response = self.convert_anthropic_response_to_openai(
anthropic_response, model_id
)
logger.info("Response conversion complete")
return converted_response
except Exception as e:
logger.error(f"Non-stream request error: {e}")
return {
"id": f"chatcmpl-{uuid4()}",
"object": "chat.completion",
"created": int(time.time()),
"model": model_id,
"choices": [
{
"index": 0,
"message": {"role": "assistant", "content": f"Error: {e}"},
"finish_reason": "stop",
}
],
"usage": {
"prompt_tokens": 0,
"completion_tokens": 0,
"total_tokens": 0,
},
}
def pipe(
self, body: Dict[str, Any]
) -> Union[str, Generator[str, None, None], APIResponse]:
"""Main entrypoint."""
model_name = body.get("model", "unknown")
stream_mode = body.get("stream", "not specified")
message_count = len(body.get("messages", []))
logger.info(
f"Processing request: model={model_name}, stream={stream_mode}, messages={message_count}"
)
try:
# Extract and process messages
system_message, messages = pop_system_message(body["messages"])
processed_messages, _ = self._process_messages(messages)
# Extract model ID
full_model = body["model"]
model_id = (
full_model[full_model.find(".") + 1 :]
if "." in full_model
else full_model
)
# Build payload and headers
payload = self._build_payload(
body, model_id, processed_messages, system_message
)
headers = self._build_headers(model_id)
url = f"{ANTHROPIC_API_BASE}/messages"
thinking_enabled = payload.get("thinking") is not None
logger.info(
f"Request configured: model={model_id}, stream={payload.get('stream')}, "
f"max_tokens={payload.get('max_tokens')}, thinking={thinking_enabled}"
)
# Route to appropriate handler depending on whether we're streaming
# or not.
if payload.get("stream", False):
return self._stream_response(url, headers, payload)
return self._non_stream_response(url, headers, payload)
except Exception as e:
logger.error(f"Request processing error: {e}")
return f"Error: {e}"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment