Created
June 6, 2025 09:56
-
-
Save iainlane/29528f83ebcdc420fb27345dc11aa1f6 to your computer and use it in GitHub Desktop.
Reworking of the anthropic manifold pipeline for open-webui, to support fetching models dynamically, thinking budgets and streaming/not-streaming.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| title: Anthropic Manifold Pipe | |
| authors: justinh-rahb and christian-taillon. significantly refactored by iainlane | |
| author_url: https://github.com/justinh-rahb | |
| funding_url: https://github.com/open-webui | |
| version: 2.0 | |
| required_open_webui_version: 0.3.17 | |
| license: MIT | |
| """ | |
| import requests | |
| import json | |
| import time | |
| import logging | |
| from urllib.parse import urlparse | |
| from uuid import uuid4 | |
| from typing import ( | |
| List, | |
| Literal, | |
| Union, | |
| Generator, | |
| Optional, | |
| Tuple, | |
| Dict, | |
| Any, | |
| ) | |
| from pydantic import BaseModel, Field | |
| from open_webui.utils.misc import pop_system_message | |
| # Constants and defaults | |
| ANTHROPIC_VERSION: str = "2023-06-01" | |
| BASE64_SIZE_MULTIPLIER: float = 3.0 / 4.0 # Base64 encoding size calculation | |
| DEFAULT_LOG_LEVEL: str = "INFO" | |
| DEFAULT_MAX_TOKENS: int = 16384 | |
| DEFAULT_MODEL_CACHE_TTL: int = 3600 | |
| DEFAULT_THINKING_BUDGET: int = 4096 | |
| MODELS_REQUEST_TIMEOUT: int = 10 # Timeout for model fetching requests | |
| THINKING_BUFFER_TOKENS: int = 1024 # Reserve tokens for response when thinking enabled | |
| # Image processing | |
| MAX_IMAGE_SIZE: int = 5 * 1024 * 1024 # 5MB per image | |
| MAX_TOTAL_IMAGE_SIZE: int = 100 * 1024 * 1024 # 100MB total | |
| # Thinking configuration | |
| THINKING_EFFORT_BUDGETS: Dict[str, int] = { | |
| "low": 2048, | |
| "medium": 4096, | |
| "high": 8192, | |
| } | |
| THINKING_MODEL_PREFIXES: List[str] = [ | |
| "claude-opus-4-", | |
| "claude-sonnet-4-", | |
| "claude-3-7-sonnet-", | |
| ] | |
| # API configuration | |
| ANTHROPIC_API_BASE: str = "https://api.anthropic.com/v1" | |
| REQUEST_TIMEOUT: Tuple[float, float] = (3.05, 60) # (connect, read) | |
| STREAM_DELAY: float = 0.01 # Small delay between chunks | |
| # Logging setup | |
| logger = logging.getLogger(__name__) | |
| logger.setLevel(DEFAULT_LOG_LEVEL) | |
| # Type definitions | |
| MessageContent = Union[str, List[Dict[str, Any]]] | |
| StreamChunk = Dict[str, Any] | |
| ThinkingBlock = Dict[str, Any] | |
| APIResponse = Dict[str, Any] | |
| ProcessedMessage = Dict[str, Any] | |
| class Pipe: | |
| class Valves(BaseModel): | |
| ANTHROPIC_API_KEY: str = Field(default="") | |
| MODEL_CACHE_TTL: int = Field(default=DEFAULT_MODEL_CACHE_TTL) | |
| ANTHROPIC_VERSION: str = Field(default=ANTHROPIC_VERSION) | |
| DEFAULT_MAX_TOKENS: int = Field(default=DEFAULT_MAX_TOKENS) | |
| ENABLE_THINKING_BY_DEFAULT: bool = Field(default=True) | |
| DEFAULT_THINKING_BUDGET: int = Field(default=DEFAULT_THINKING_BUDGET) | |
| DEFAULT_STREAM: bool = Field(default=True) | |
| LOG_LEVEL: Literal["DEBUG", "INFO", "WARNING", "ERROR"] = Field( | |
| default=DEFAULT_LOG_LEVEL | |
| ) | |
| def __init__(self) -> None: | |
| logger.debug("Initialising Anthropic Pipe") | |
| self.type: str = "manifold" | |
| self.id: str = "anthropic" | |
| self.name: str = "Anthropic: " | |
| self.valves: Pipe.Valves = self.Valves() | |
| # Cache for models | |
| self._cached_models: Optional[List[Dict[str, str]]] = None | |
| self._cache_timestamp: Optional[float] = None | |
| logger.info("Anthropic Pipe ready") | |
| def _validate_valves(self) -> "Pipe.Valves": | |
| """Validate valve values and apply corrections if needed.""" | |
| def _safe_int(value: int, min_value: int, name: str, default: int) -> int: | |
| if value < min_value: | |
| logger.warning( | |
| f"Invalid {name} value '{value}', using default {default}" | |
| ) | |
| return default | |
| return value | |
| self.valves.MODEL_CACHE_TTL = _safe_int( | |
| self.valves.MODEL_CACHE_TTL, 1, "MODEL_CACHE_TTL", DEFAULT_MODEL_CACHE_TTL | |
| ) | |
| self.valves.DEFAULT_MAX_TOKENS = _safe_int( | |
| self.valves.DEFAULT_MAX_TOKENS, | |
| 1, | |
| "DEFAULT_MAX_TOKENS", | |
| DEFAULT_MAX_TOKENS, | |
| ) | |
| self.valves.DEFAULT_THINKING_BUDGET = _safe_int( | |
| self.valves.DEFAULT_THINKING_BUDGET, | |
| 0, | |
| "DEFAULT_THINKING_BUDGET", | |
| DEFAULT_THINKING_BUDGET, | |
| ) | |
| logger.setLevel(getattr(logging, self.valves.LOG_LEVEL)) | |
| return self.valves | |
| def _is_cache_valid(self) -> bool: | |
| """Check if the cached models are still valid.""" | |
| if not self._cached_models or not self._cache_timestamp: | |
| return False | |
| age = time.time() - self._cache_timestamp | |
| is_valid = age < self.valves.MODEL_CACHE_TTL | |
| if not is_valid: | |
| logger.info( | |
| f"Model cache expired (age: {age:.1f}s > TTL: {self.valves.MODEL_CACHE_TTL}s)" | |
| ) | |
| return is_valid | |
| def _make_api_request(self) -> requests.Response: | |
| """Make the API request to fetch models.""" | |
| headers = { | |
| "x-api-key": self.valves.ANTHROPIC_API_KEY, | |
| "anthropic-version": self.valves.ANTHROPIC_VERSION, | |
| "content-type": "application/json", | |
| } | |
| url = f"{ANTHROPIC_API_BASE}/models" | |
| logger.info(f"Fetching models from: {url}") | |
| response = requests.get(url, headers=headers, timeout=MODELS_REQUEST_TIMEOUT) | |
| logger.info(f"Models API responded: HTTP {response.status_code}") | |
| return response | |
| def _process_api_models(self, data: Dict[str, Any]) -> List[Dict[str, str]]: | |
| """Process the models returned by the API.""" | |
| raw_models = data.get("data", []) | |
| models = [] | |
| thinking_count = 0 | |
| for model in raw_models: | |
| model_id = model.get("id", "") | |
| display_name = model.get("display_name", model_id) | |
| if self._supports_thinking(model_id): | |
| display_name += " (Thinking)" | |
| thinking_count += 1 | |
| models.append({"id": model_id, "name": display_name}) | |
| sorted_models = sorted(models, key=lambda x: x["name"]) | |
| logger.info( | |
| f"Processed {len(sorted_models)} models ({thinking_count} with thinking support)" | |
| ) | |
| return sorted_models | |
| def _supports_thinking(self, model_id: str) -> bool: | |
| """Check if a model supports thinking.""" | |
| return any(model_id.startswith(prefix) for prefix in THINKING_MODEL_PREFIXES) | |
| def _fetch_models_from_api(self) -> List[Dict[str, str]]: | |
| """Fetch available models from the Anthropic API.""" | |
| if not self.valves.ANTHROPIC_API_KEY: | |
| logger.error("Cannot fetch models: No API key provided") | |
| return [] | |
| try: | |
| response = self._make_api_request() | |
| if response.status_code != 200: | |
| logger.error(f"API request failed: HTTP {response.status_code}") | |
| if response.text: | |
| logger.debug(f"Error response: {response.text[:500]}") | |
| return [] | |
| data = response.json() | |
| models = self._process_api_models(data) | |
| logger.info(f"Successfully fetched {len(models)} models from API") | |
| return models | |
| except requests.exceptions.RequestException as e: | |
| logger.error(f"Network error fetching models: {e}") | |
| return [] | |
| except json.JSONDecodeError as e: | |
| logger.error(f"Invalid JSON response from models API: {e}") | |
| return [] | |
| except Exception as e: | |
| logger.error(f"Unexpected error fetching models: {e}") | |
| return [] | |
| def get_anthropic_models(self) -> List[Dict[str, str]]: | |
| """Get available Anthropic models, using cache if valid.""" | |
| if self._is_cache_valid(): | |
| return self._cached_models # type: ignore | |
| models = self._fetch_models_from_api() | |
| # Update cache | |
| self._cached_models = models | |
| self._cache_timestamp = time.time() | |
| if not models: | |
| logger.warning("No models available - cache updated with empty list") | |
| return [] | |
| logger.info(f"Cached {len(models)} models") | |
| return models | |
| def pipes(self) -> List[Dict[str, str]]: | |
| """Required method for Open WebUI pipe interface.""" | |
| self._validate_valves() | |
| return self.get_anthropic_models() | |
| def _convert_usage_to_openai( | |
| self, | |
| usage_data: Dict[str, Any], | |
| model_id: str, | |
| thinking_blocks: Optional[List[ThinkingBlock]] = None, | |
| ) -> Dict[str, Any]: | |
| """Convert Anthropic usage format to OpenAI-compatible format with enhanced metadata.""" | |
| input_tokens = usage_data.get("input_tokens", 0) | |
| output_tokens = usage_data.get("output_tokens", 0) | |
| cache_creation = usage_data.get("cache_creation_input_tokens", 0) | |
| cache_read = usage_data.get("cache_read_input_tokens", 0) | |
| # Calculate reasoning tokens from thinking blocks | |
| reasoning_tokens = 0 | |
| thinking_chars = 0 | |
| if thinking_blocks: | |
| for block in thinking_blocks: | |
| if block.get("type") == "thinking": | |
| thinking_text = block.get("thinking", "") | |
| thinking_chars += len(thinking_text) | |
| # Rough estimation: ~4 chars per token for reasoning content | |
| reasoning_tokens += len(thinking_text) // 4 | |
| # Calculate cache efficiency | |
| cache_hit_rate = 0.0 | |
| total_cache_tokens = cache_creation + cache_read | |
| if input_tokens > 0: | |
| cache_hit_rate = round((cache_read / input_tokens) * 100, 2) | |
| # Enhanced OpenAI-compatible usage with additional metadata | |
| usage = { | |
| "prompt_tokens": input_tokens, | |
| "completion_tokens": output_tokens, | |
| "total_tokens": input_tokens + output_tokens, | |
| "completion_tokens_details": { | |
| "reasoning_tokens": reasoning_tokens, | |
| "accepted_prediction_tokens": 0, | |
| "rejected_prediction_tokens": 0, | |
| }, | |
| "cache_creation_input_tokens": cache_creation, | |
| "cache_read_input_tokens": cache_read, | |
| "model": model_id, | |
| # Additional metadata (similar to Ollama format) | |
| "anthropic_metadata": { | |
| "thinking_blocks": len(thinking_blocks) if thinking_blocks else 0, | |
| "thinking_characters": thinking_chars, | |
| "cache_hit_rate_percent": cache_hit_rate, | |
| "cache_efficiency": "high" | |
| if cache_hit_rate > 50 | |
| else "medium" | |
| if cache_hit_rate > 20 | |
| else "low", | |
| "total_cache_tokens": total_cache_tokens, | |
| }, | |
| } | |
| return usage | |
| def _process_content_blocks( | |
| self, content_blocks: List[Dict[str, Any]] | |
| ) -> Tuple[str, str, List[ThinkingBlock]]: | |
| """Process content blocks and extract text, reasoning, and thinking blocks.""" | |
| content = "" | |
| reasoning_parts = [] | |
| thinking_blocks: List[ThinkingBlock] = [] | |
| for i, block in enumerate(content_blocks): | |
| block_type = block.get("type", "unknown") | |
| match block_type: | |
| case "text": | |
| text = block.get("text", "") | |
| content += text | |
| logger.debug( | |
| f"Added text content from block {i}: {len(text)} chars" | |
| ) | |
| case "thinking": | |
| thinking_text = block.get("thinking", block.get("text", "")) | |
| signature = block.get("signature") | |
| thinking_block: ThinkingBlock = { | |
| "type": "thinking", | |
| "thinking": thinking_text, | |
| } | |
| if signature: | |
| thinking_block["signature"] = signature | |
| thinking_blocks.append(thinking_block) | |
| reasoning_parts.append(thinking_text) | |
| logger.debug( | |
| f"Added thinking block {i}: {len(thinking_text)} chars" | |
| ) | |
| case "redacted_thinking": | |
| redacted_data = block.get("data", "") | |
| thinking_blocks.append( | |
| {"type": "redacted_thinking", "data": redacted_data} | |
| ) | |
| reasoning_parts.append( | |
| "[Some reasoning content was redacted for safety]" | |
| ) | |
| logger.debug(f"Added redacted thinking block {i}") | |
| # Combine all reasoning parts | |
| reasoning_content = "\n\n".join(reasoning_parts) if reasoning_parts else "" | |
| if thinking_blocks: | |
| logger.info( | |
| f"Processed {len(thinking_blocks)} thinking blocks with {len(reasoning_content)} chars total reasoning" | |
| ) | |
| return content, reasoning_content, thinking_blocks | |
| def _format_reasoning_for_nonstream_ui( | |
| self, reasoning_content: str, thinking_blocks: List[ThinkingBlock] | |
| ) -> str: | |
| """Format reasoning content using details tags for non-streaming UI compatibility.""" | |
| if not reasoning_content: | |
| return "" | |
| # Format with quote markers (matching streaming behavior) | |
| reasoning_display = "\n".join( | |
| f"> {line}" if not line.startswith(">") else line | |
| for line in reasoning_content.splitlines() | |
| ) | |
| # Check for redacted content | |
| has_redacted = any( | |
| block.get("type") == "redacted_thinking" for block in thinking_blocks | |
| ) | |
| # Format using the same details structure as streaming (without duration) | |
| summary_text = "Thinking…" if has_redacted else "Thinking…" | |
| if has_redacted: | |
| summary_text += " (some content redacted for safety)" | |
| details_html = ( | |
| f'<details type="reasoning" done="true">\n' | |
| f"<summary>{summary_text}</summary>\n" | |
| f"{reasoning_display}\n" | |
| f"</details>\n\n" | |
| ) | |
| return details_html | |
| def convert_anthropic_response_to_openai( | |
| self, anthropic_response: Dict[str, Any], model_id: str | |
| ) -> APIResponse: | |
| """Convert Anthropic response format to OpenAI-compatible format.""" | |
| content_blocks = anthropic_response.get("content", []) | |
| content, reasoning_content, thinking_blocks = self._process_content_blocks( | |
| content_blocks | |
| ) | |
| # Add formatted reasoning to content using details tags for non-streaming UI | |
| if reasoning_content: | |
| formatted_reasoning = self._format_reasoning_for_nonstream_ui( | |
| reasoning_content, thinking_blocks | |
| ) | |
| content = formatted_reasoning + content | |
| # Convert usage with thinking blocks for accurate reasoning token count | |
| usage = self._convert_usage_to_openai( | |
| anthropic_response.get("usage", {}), model_id, thinking_blocks | |
| ) | |
| # Build OpenAI response | |
| stop_reason = anthropic_response.get("stop_reason", "stop") | |
| openai_response: APIResponse = { | |
| "id": f"chatcmpl-{uuid4()}", | |
| "object": "chat.completion", | |
| "created": int(time.time()), | |
| "model": model_id, | |
| "choices": [ | |
| { | |
| "index": 0, | |
| "message": {"role": "assistant", "content": content}, | |
| "finish_reason": self._map_stop_reason(stop_reason), | |
| } | |
| ], | |
| "usage": usage, | |
| } | |
| # Preserve thinking blocks for continuity | |
| if thinking_blocks: | |
| openai_response["anthropic_thinking_blocks"] = thinking_blocks | |
| return openai_response | |
| def convert_anthropic_stream_to_openai( | |
| self, | |
| chunk_data: Dict[str, Any], | |
| model_id: str, | |
| thinking_blocks: Optional[List[ThinkingBlock]] = None, | |
| ) -> StreamChunk: | |
| """Convert Anthropic streaming chunk to OpenAI-compatible format.""" | |
| delta: Dict[str, Any] = {} | |
| finish_reason: Optional[str] = None | |
| usage: Optional[Dict[str, Any]] = None | |
| chunk_type = chunk_data.get("type") | |
| match chunk_type: | |
| case "message_start": | |
| delta = {"role": "assistant", "content": ""} | |
| case "content_block_start": | |
| content_block = chunk_data.get("content_block", {}) | |
| if content_block.get("type") == "text": | |
| text = content_block.get("text", "") | |
| delta = {"content": text} | |
| case "content_block_delta": | |
| delta_content = chunk_data.get("delta", {}) | |
| delta_type = delta_content.get("type") | |
| match delta_type: | |
| case "text_delta": | |
| text = delta_content.get("text", "") | |
| delta = {"content": text} | |
| case "message_delta": | |
| usage_data = chunk_data.get("usage", {}) | |
| if usage_data: | |
| usage = self._convert_usage_to_openai( | |
| usage_data, model_id, thinking_blocks | |
| ) | |
| case "message_stop": | |
| finish_reason = "stop" | |
| response: StreamChunk = { | |
| "id": f"chatcmpl-{uuid4()}", | |
| "object": "chat.completion.chunk", | |
| "created": int(time.time()), | |
| "model": model_id, | |
| "choices": [{"index": 0, "delta": delta, "finish_reason": finish_reason}], | |
| } | |
| if usage: | |
| response["usage"] = usage | |
| return response | |
| def _format_api_error(self, status_code: int, error_text: str) -> str: | |
| """Format API errors to be more user-friendly.""" | |
| try: | |
| error_data = json.loads(error_text) | |
| except (json.JSONDecodeError, KeyError): | |
| # Fall back to generic error if we can't parse JSON | |
| return f"HTTP Error {status_code}: {error_text[:200]}{'...' if len(error_text) > 200 else ''}" | |
| if "error" not in error_data or "message" not in error_data["error"]: | |
| return f"HTTP Error {status_code}: {error_text[:200]}{'...' if len(error_text) > 200 else ''}" | |
| message = error_data["error"]["message"] | |
| error_type = error_data.get("error", {}).get("type", "") | |
| match error_type: | |
| case "authentication_error": | |
| return "Authentication failed. Please check your API key." | |
| case "rate_limit_error": | |
| return "Rate limit exceeded. Please try again later." | |
| case "invalid_request_error": | |
| return f"Request error: {message}" | |
| case _: | |
| return f"API Error: {message}" | |
| def _map_stop_reason(self, anthropic_reason: str) -> str: | |
| """Map Anthropic stop reasons to OpenAI format.""" | |
| mapping = { | |
| "end_turn": "stop", | |
| "max_tokens": "length", | |
| "stop_sequence": "stop", | |
| "tool_use": "tool_calls", | |
| } | |
| return mapping.get(anthropic_reason, "stop") | |
| def _validate_image_url(self, url: str) -> bool: | |
| """Validate image URL for security.""" | |
| try: | |
| parsed = urlparse(url) | |
| if parsed.scheme not in ("http", "https"): | |
| logger.warning( | |
| f"Rejected image URL with invalid scheme: {parsed.scheme}" | |
| ) | |
| return False | |
| # Block common internal/private addresses | |
| hostname = parsed.hostname | |
| if hostname: | |
| hostname = hostname.lower() | |
| # Block localhost, private IPs, and common internal hostnames | |
| blocked_hosts = { | |
| "localhost", | |
| "127.0.0.1", | |
| "0.0.0.0", | |
| "::1", | |
| "metadata.google.internal", | |
| "instance-data", | |
| "169.254.169.254", | |
| } | |
| if hostname in blocked_hosts: | |
| logger.warning( | |
| f"Rejected image URL with blocked hostname: {hostname}" | |
| ) | |
| return False | |
| if hostname.startswith(("10.", "192.168.", "172.")): | |
| logger.warning(f"Rejected image URL with private IP: {hostname}") | |
| return False | |
| return True | |
| except Exception as e: | |
| logger.warning(f"Failed to parse image URL: {e}") | |
| return False | |
| def _validate_image_size(self, image_data: Dict[str, Any]) -> Dict[str, Any]: | |
| """Process and validate image data for INPUT to Claude. | |
| Note: Claude cannot generate images - this code only handles images | |
| that users upload for Claude to analyze. We validate size and security | |
| constraints before sending to Anthropic's API. | |
| """ | |
| url = image_data["image_url"]["url"] | |
| if url.startswith("data:image"): | |
| return self._process_base64_image(url) | |
| return self._process_remote_image(url) | |
| def _process_base64_image(self, url: str) -> Dict[str, Any]: | |
| """Process and validate base64 image data for Claude analysis. | |
| Claude accepts images as input for analysis but cannot generate images. | |
| This validates size limits and converts to Anthropic's format. | |
| """ | |
| try: | |
| mime_type, base64_data = url.split(",", 1) | |
| media_type = mime_type.split(":")[1].split(";")[0] | |
| image_size = len(base64_data) * BASE64_SIZE_MULTIPLIER | |
| size_mb = image_size / (1024 * 1024) | |
| logger.debug(f"Processing base64 image: {size_mb:.2f}MB") | |
| if image_size > MAX_IMAGE_SIZE: | |
| error_msg = f"Base64 image exceeds 5MB limit: {size_mb:.2f}MB" | |
| logger.error(error_msg) | |
| raise ValueError(error_msg) | |
| return { | |
| "type": "image", | |
| "source": { | |
| "type": "base64", | |
| "media_type": media_type, | |
| "data": base64_data, | |
| }, | |
| } | |
| except (ValueError, IndexError) as e: | |
| error_msg = f"Invalid base64 image data: {e}" | |
| logger.error(error_msg) | |
| raise ValueError(error_msg) | |
| def _process_remote_image(self, url: str) -> Dict[str, Any]: | |
| """Process and validate remote image URL for Claude analysis. | |
| Claude accepts images as input for analysis but cannot generate images. | |
| This validates the URL for security and size constraints. | |
| """ | |
| # Validate URL for security (prevent SSRF attacks) | |
| if not self._validate_image_url(url): | |
| error_msg = "Image URL rejected for security reasons" | |
| logger.error(error_msg) | |
| raise ValueError(error_msg) | |
| try: | |
| logger.debug(f"Checking remote image size: {url}") | |
| response = requests.head(url, allow_redirects=True, timeout=10) | |
| response.raise_for_status() | |
| content_length = int(response.headers.get("content-length", 0)) | |
| size_mb = content_length / (1024 * 1024) | |
| logger.debug(f"Remote image size: {size_mb:.2f}MB") | |
| if content_length > MAX_IMAGE_SIZE: | |
| error_msg = f"Remote image exceeds 5MB limit: {size_mb:.2f}MB" | |
| logger.error(error_msg) | |
| raise ValueError(error_msg) | |
| return {"type": "image", "source": {"type": "url", "url": url}} | |
| except requests.exceptions.RequestException as e: | |
| error_msg = f"Failed to validate remote image: {e}" | |
| logger.error(error_msg) | |
| raise ValueError(error_msg) | |
| def _process_messages( | |
| self, messages: List[Dict[str, Any]] | |
| ) -> Tuple[List[ProcessedMessage], int]: | |
| """Process messages and return processed messages with total image size. | |
| Handles text and image content from users. Images are only _input_ for | |
| Claude to analyse - Claude cannot generate or return images, only text | |
| responses. | |
| """ | |
| logger.debug(f"Processing {len(messages)} messages") | |
| processed_messages: List[ProcessedMessage] = [] | |
| total_image_size = 0 | |
| for i, message in enumerate(messages): | |
| logger.debug( | |
| f"Processing message {i}: role={message.get('role', 'unknown')}" | |
| ) | |
| processed_content = [] | |
| content = message.get("content", "") | |
| if not isinstance(content, list): | |
| # Simple text content | |
| processed_content = [{"type": "text", "text": str(content)}] | |
| else: | |
| for _, item in enumerate(content): | |
| item_type = item.get("type", "unknown") | |
| match item_type: | |
| case "text": | |
| text = item.get("text", "") | |
| processed_content.append({"type": "text", "text": text}) | |
| case "image_url": | |
| processed_image = self._validate_image_size(item) | |
| processed_content.append(processed_image) | |
| # Track total image size for base64 images | |
| if processed_image["source"]["type"] != "base64": | |
| continue | |
| image_size = ( | |
| len(processed_image["source"]["data"]) | |
| * BASE64_SIZE_MULTIPLIER | |
| ) | |
| total_image_size += image_size | |
| if total_image_size > MAX_TOTAL_IMAGE_SIZE: | |
| size_mb = total_image_size / (1024 * 1024) | |
| error_msg = f"Total image size exceeds 100MB limit: {size_mb:.2f}MB" | |
| logger.error(error_msg) | |
| raise ValueError(error_msg) | |
| logger.debug( | |
| f"Total image size: {total_image_size / (1024 * 1024):.2f}MB" | |
| ) | |
| processed_message: ProcessedMessage = { | |
| "role": message["role"], | |
| "content": processed_content, | |
| } | |
| processed_messages.append(processed_message) | |
| if total_image_size > 0: | |
| logger.info( | |
| f"Processed {len(processed_messages)} messages with {total_image_size / (1024 * 1024):.2f}MB total images" | |
| ) | |
| else: | |
| logger.debug(f"Processed {len(processed_messages)} messages (no images)") | |
| return processed_messages, int(total_image_size) | |
| def _configure_thinking( | |
| self, body: Dict[str, Any], model_id: str, payload: Dict[str, Any] | |
| ) -> bool: | |
| """Configure thinking parameters and return whether thinking is enabled.""" | |
| supports_thinking = self._supports_thinking(model_id) | |
| if not supports_thinking: | |
| return False | |
| if not self.valves.ENABLE_THINKING_BY_DEFAULT: | |
| return False | |
| thinking_config = body.get("thinking", True) | |
| if thinking_config is False: | |
| logger.info("Thinking disabled by user request") | |
| return False | |
| thinking_enabled = True | |
| if isinstance(thinking_config, dict): | |
| payload["thinking"] = thinking_config | |
| logger.info(f"Applied custom thinking config: {thinking_config}") | |
| return thinking_enabled | |
| if thinking_config not in (True, None): | |
| return thinking_enabled | |
| budget_tokens = body.get("thinking_budget", self.valves.DEFAULT_THINKING_BUDGET) | |
| max_budget = max(0, payload["max_tokens"] - THINKING_BUFFER_TOKENS) | |
| final_budget = max(0, min(budget_tokens, max_budget)) | |
| if final_budget == 0: | |
| logger.warning( | |
| f"Thinking budget is 0 (max_tokens: {payload['max_tokens']}, buffer: {THINKING_BUFFER_TOKENS})" | |
| ) | |
| payload["thinking"] = { | |
| "type": "enabled", | |
| "budget_tokens": final_budget, | |
| } | |
| logger.info( | |
| f"Configured thinking: budget={final_budget} tokens (requested={budget_tokens}, max_available={max_budget})" | |
| ) | |
| # Handle reasoning_effort parameter | |
| reasoning_effort = body.get("reasoning_effort") | |
| if not reasoning_effort: | |
| if thinking_enabled: | |
| logger.info(f"Thinking enabled for model {model_id}") | |
| return thinking_enabled | |
| budget = THINKING_EFFORT_BUDGETS.get( | |
| reasoning_effort, THINKING_EFFORT_BUDGETS["medium"] | |
| ) | |
| max_budget = max(0, payload["max_tokens"] - THINKING_BUFFER_TOKENS) | |
| final_budget = max(0, min(budget, max_budget)) | |
| if final_budget == 0: | |
| logger.warning( | |
| f"Thinking budget from reasoning_effort is 0 (effort: {reasoning_effort})" | |
| ) | |
| if "thinking" not in payload: | |
| payload["thinking"] = { | |
| "type": "enabled", | |
| "budget_tokens": final_budget, | |
| } | |
| else: | |
| payload["thinking"]["budget_tokens"] = final_budget | |
| logger.info( | |
| f"Applied reasoning_effort '{reasoning_effort}': thinking budget set to {final_budget} tokens" | |
| ) | |
| return thinking_enabled | |
| def _apply_thinking_constraints(self, payload: Dict[str, Any]) -> None: | |
| """Apply parameter constraints when thinking is enabled.""" | |
| logger.debug("Applying thinking parameter constraints") | |
| # Temperature must be 1.0 | |
| original_temp = payload.get("temperature") | |
| if original_temp != 1.0: | |
| payload["temperature"] = 1.0 | |
| logger.debug(f"Adjusted temperature: {original_temp} -> 1.0") | |
| # Remove top_k | |
| if "top_k" in payload: | |
| removed_top_k = payload.pop("top_k") | |
| logger.debug(f"Removed top_k: {removed_top_k}") | |
| # Constrain top_p to [0.95, 1.0] | |
| if "top_p" in payload: | |
| original_top_p = payload["top_p"] | |
| if not (0.95 <= original_top_p <= 1.0): | |
| new_top_p = max(0.95, min(1.0, original_top_p)) | |
| payload["top_p"] = new_top_p | |
| logger.debug(f"Adjusted top_p: {original_top_p} -> {new_top_p}") | |
| logger.debug("Thinking constraints applied") | |
| def _build_payload( | |
| self, | |
| body: Dict[str, Any], | |
| model_id: str, | |
| processed_messages: List[ProcessedMessage], | |
| system_message: Optional[str], | |
| ) -> Dict[str, Any]: | |
| """Build the API payload.""" | |
| logger.debug(f"Building payload for model: {model_id}") | |
| # Apply default stream setting if not specified | |
| stream_setting = body.get("stream", self.valves.DEFAULT_STREAM) | |
| logger.debug( | |
| f"Stream setting: {stream_setting} (default: {self.valves.DEFAULT_STREAM})" | |
| ) | |
| payload = { | |
| "model": model_id, | |
| "messages": processed_messages, | |
| "max_tokens": body.get("max_tokens", self.valves.DEFAULT_MAX_TOKENS), | |
| "temperature": body.get("temperature", 0.8), | |
| "top_k": body.get("top_k", 40), | |
| "top_p": body.get("top_p", 0.9), | |
| "stop_sequences": body.get("stop", []), | |
| "stream": stream_setting, | |
| } | |
| logger.debug( | |
| f"Base payload: max_tokens={payload['max_tokens']}, " | |
| f"temperature={payload['temperature']}, stream={payload['stream']}" | |
| ) | |
| if system_message: | |
| payload["system"] = str(system_message) | |
| logger.debug(f"Added system message: {len(str(system_message))} chars") | |
| # Configure thinking | |
| thinking_enabled = self._configure_thinking(body, model_id, payload) | |
| if thinking_enabled: | |
| logger.debug("Applying thinking constraints") | |
| self._apply_thinking_constraints(payload) | |
| # Handle custom metadata | |
| metadata = body.get("metadata") | |
| if metadata and isinstance(metadata, dict): | |
| anthropic_metadata = { | |
| key.replace("anthropic_", ""): value | |
| for key, value in metadata.items() | |
| if key.startswith("anthropic_") | |
| } | |
| if anthropic_metadata: | |
| payload["metadata"] = anthropic_metadata | |
| logger.debug(f"Added metadata: {anthropic_metadata}") | |
| return payload | |
| def _build_headers(self, model_id: str) -> Dict[str, str]: | |
| """Build API request headers.""" | |
| logger.debug(f"Building headers for model: {model_id}") | |
| headers = { | |
| "x-api-key": self.valves.ANTHROPIC_API_KEY, | |
| "anthropic-version": self.valves.ANTHROPIC_VERSION, | |
| "content-type": "application/json", | |
| } | |
| # Add beta header for Claude 4 interleaved thinking | |
| if self._supports_thinking(model_id) and model_id.startswith( | |
| ("claude-opus-4-", "claude-sonnet-4-") | |
| ): | |
| headers["anthropic-beta"] = "interleaved-thinking-2025-05-14" | |
| logger.debug("Added interleaved thinking beta header") | |
| return headers | |
| def _handle_thinking_stream_start( | |
| self, data: Dict[str, Any], thinking_blocks: List[ThinkingBlock], model_id: str | |
| ) -> Tuple[bool, Optional[str]]: | |
| """Handle the start of thinking-related stream events.""" | |
| if data.get("type") != "content_block_start": | |
| return False, None | |
| content_block = data.get("content_block", {}) | |
| block_type = content_block.get("type") | |
| match block_type: | |
| case "thinking": | |
| logger.debug("Started thinking block") | |
| return True, None | |
| case "redacted_thinking": | |
| redacted_data = content_block.get("data", "") | |
| thinking_blocks.append( | |
| {"type": "redacted_thinking", "data": redacted_data} | |
| ) | |
| # Emit redacted notice | |
| reasoning_chunk = { | |
| "type": "content_block_delta", | |
| "delta": { | |
| "type": "reasoning_content", | |
| "text": "[Some reasoning content was redacted for safety]", | |
| }, | |
| } | |
| openai_chunk = self.convert_anthropic_stream_to_openai( | |
| reasoning_chunk, model_id, thinking_blocks | |
| ) | |
| chunk_data = f"data: {json.dumps(openai_chunk)}\n\n" | |
| logger.info("Emitted redacted thinking notice") | |
| return False, chunk_data | |
| case _: | |
| return False, None | |
| def _process_thinking_delta( | |
| self, | |
| delta: Dict[str, Any], | |
| accumulated_reasoning: str, | |
| current_signature: str, | |
| model_id: str, | |
| thinking_blocks: List[ThinkingBlock], | |
| ) -> Tuple[str, str, Optional[str]]: | |
| """Process thinking delta and return updated state and chunk to emit.""" | |
| delta_type = delta.get("type") | |
| match delta_type: | |
| case "signature_delta": | |
| sig_chunk = delta.get("signature", "") | |
| current_signature += sig_chunk | |
| return accumulated_reasoning, current_signature, None | |
| case "thinking_delta": | |
| text = delta.get("thinking", "") | |
| case "text_delta": | |
| text = delta.get("text", "") | |
| case _: | |
| text = delta.get("text", "") | |
| if not text: | |
| return accumulated_reasoning, current_signature, None | |
| accumulated_reasoning += text | |
| # Emit reasoning content | |
| reasoning_chunk = { | |
| "type": "content_block_delta", | |
| "delta": {"type": "reasoning_content", "text": text}, | |
| } | |
| openai_chunk = self.convert_anthropic_stream_to_openai( | |
| reasoning_chunk, model_id, thinking_blocks | |
| ) | |
| chunk_data = f"data: {json.dumps(openai_chunk)}\n\n" | |
| return accumulated_reasoning, current_signature, chunk_data | |
| def _handle_thinking_stream_end( | |
| self, | |
| accumulated_reasoning: str, | |
| current_signature: str, | |
| thinking_blocks: List[ThinkingBlock], | |
| ) -> None: | |
| """Handle the end of a thinking stream block.""" | |
| logger.debug("Handling thinking stream end") | |
| logger.debug(f"Thinking complete: {len(accumulated_reasoning)} chars") | |
| thinking_block: ThinkingBlock = { | |
| "type": "thinking", | |
| "thinking": accumulated_reasoning, | |
| } | |
| if current_signature: | |
| thinking_block["signature"] = current_signature | |
| logger.debug(f"Stored signature: {len(current_signature)} chars") | |
| thinking_blocks.append(thinking_block) | |
| logger.debug(f"Total thinking blocks: {len(thinking_blocks)}") | |
| def _process_stream_line(self, line: str) -> Optional[Dict[str, Any]]: | |
| """Process a single stream line and return parsed data.""" | |
| if not line or not line.startswith("data: "): | |
| return None | |
| try: | |
| data = json.loads(line[6:]) | |
| logger.debug(f"Parsed stream data: type={data.get('type', 'unknown')}") | |
| return data | |
| except json.JSONDecodeError as e: | |
| logger.error(f"Failed to parse stream JSON: {line}, Error: {e}") | |
| return None | |
| except Exception as e: | |
| logger.error(f"Unexpected error parsing stream line: {e}") | |
| return None | |
| def _emit_final_stream_chunk( | |
| self, data: Dict[str, Any], thinking_blocks: List[ThinkingBlock], model_id: str | |
| ) -> str: | |
| """Emit the final stream chunk with thinking blocks.""" | |
| final_chunk = self.convert_anthropic_stream_to_openai( | |
| data, model_id, thinking_blocks | |
| ) | |
| if thinking_blocks: | |
| final_chunk["anthropic_thinking_blocks"] = thinking_blocks | |
| logger.debug(f"Added {len(thinking_blocks)} thinking blocks to final chunk") | |
| return f"data: {json.dumps(final_chunk)}\n\n" | |
| def _stream_response( | |
| self, url: str, headers: Dict[str, str], payload: Dict[str, Any] | |
| ) -> Generator[str, None, None]: | |
| """Handle streaming response.""" | |
| model_id = payload.get("model", "claude") | |
| logger.info(f"Starting stream response for model: {model_id}") | |
| # Stream state | |
| accumulated_reasoning = "" | |
| current_signature = "" | |
| in_thinking_block = False | |
| thinking_blocks: List[ThinkingBlock] = [] | |
| chunk_count = 0 | |
| try: | |
| logger.info(f"Starting stream request to: {url}") | |
| with requests.post( | |
| url, headers=headers, json=payload, stream=True, timeout=REQUEST_TIMEOUT | |
| ) as response: | |
| if response.status_code != 200: | |
| error_text = response.text | |
| logger.error(f"Stream API error: HTTP {response.status_code}") | |
| if error_text: | |
| logger.debug(f"Error response: {error_text[:500]}") | |
| # Parse and format error for better user experience | |
| user_friendly_error = self._format_api_error( | |
| response.status_code, error_text | |
| ) | |
| raise Exception(user_friendly_error) | |
| for line in response.iter_lines(): | |
| if not line: | |
| continue | |
| line_str = line.decode("utf-8") | |
| chunk_count += 1 | |
| data = self._process_stream_line(line_str) | |
| if not data: | |
| continue | |
| # Handle thinking content | |
| if in_thinking_block: | |
| if data.get("type") == "content_block_delta": | |
| delta = data.get("delta", {}) | |
| accumulated_reasoning, current_signature, chunk = ( | |
| self._process_thinking_delta( | |
| delta, | |
| accumulated_reasoning, | |
| current_signature, | |
| model_id, | |
| thinking_blocks, | |
| ) | |
| ) | |
| if chunk: | |
| yield chunk | |
| continue | |
| elif data.get("type") == "content_block_stop": | |
| self._handle_thinking_stream_end( | |
| accumulated_reasoning, | |
| current_signature, | |
| thinking_blocks, | |
| ) | |
| in_thinking_block = False | |
| accumulated_reasoning = "" | |
| current_signature = "" | |
| continue | |
| # Handle thinking block start | |
| thinking_started, chunk = self._handle_thinking_stream_start( | |
| data, thinking_blocks, model_id | |
| ) | |
| if thinking_started: | |
| in_thinking_block = True | |
| accumulated_reasoning = "" | |
| current_signature = "" | |
| continue | |
| if chunk: | |
| yield chunk | |
| continue | |
| # Handle final message | |
| if data.get("type") == "message_stop": | |
| final_chunk = self._emit_final_stream_chunk( | |
| data, thinking_blocks, model_id | |
| ) | |
| yield final_chunk | |
| break | |
| # Convert and emit regular content | |
| if not in_thinking_block: | |
| openai_chunk = self.convert_anthropic_stream_to_openai( | |
| data, model_id, thinking_blocks | |
| ) | |
| yield f"data: {json.dumps(openai_chunk)}\n\n" | |
| time.sleep(STREAM_DELAY) | |
| # Log thinking token information (consistent with non-streaming) | |
| if thinking_blocks: | |
| total_thinking_chars = sum( | |
| len(block.get("thinking", "")) | |
| for block in thinking_blocks | |
| if block.get("type") == "thinking" | |
| ) | |
| logger.info( | |
| f"Stream response contains {len(thinking_blocks)} thinking blocks ({total_thinking_chars} chars)" | |
| ) | |
| logger.info(f"Stream complete: processed {chunk_count} chunks") | |
| yield "data: [DONE]\n\n" | |
| except Exception as e: | |
| logger.error(f"Stream response error: {e}") | |
| error_chunk = { | |
| "id": f"chatcmpl-{uuid4()}", | |
| "object": "chat.completion.chunk", | |
| "created": int(time.time()), | |
| "model": model_id, | |
| "choices": [ | |
| { | |
| "index": 0, | |
| "delta": {"content": f"Error: {e}"}, | |
| "finish_reason": "stop", | |
| } | |
| ], | |
| } | |
| yield f"data: {json.dumps(error_chunk)}\n\n" | |
| yield "data: [DONE]\n\n" | |
| def _non_stream_response( | |
| self, url: str, headers: Dict[str, str], payload: Dict[str, Any] | |
| ) -> APIResponse: | |
| """Handle non-streaming response.""" | |
| model_id = payload.get("model", "claude") | |
| try: | |
| logger.info(f"Making non-stream request for model {model_id} to {url}") | |
| response = requests.post( | |
| url, headers=headers, json=payload, timeout=REQUEST_TIMEOUT | |
| ) | |
| if response.status_code != 200: | |
| error_text = response.text | |
| logger.error(f"Non-stream API error: HTTP {response.status_code}") | |
| if error_text: | |
| logger.debug(f"Error response: {error_text[:500]}") | |
| # Parse and format error for better user experience | |
| user_friendly_error = self._format_api_error( | |
| response.status_code, error_text | |
| ) | |
| raise Exception(user_friendly_error) | |
| anthropic_response = response.json() | |
| # Log content analysis | |
| content_blocks = anthropic_response.get("content", []) | |
| if content_blocks: | |
| thinking_blocks = [ | |
| b for b in content_blocks if b.get("type") == "thinking" | |
| ] | |
| if thinking_blocks: | |
| total_thinking_chars = sum( | |
| len(b.get("thinking", "")) for b in thinking_blocks | |
| ) | |
| logger.info( | |
| f"Response contains {len(thinking_blocks)} thinking blocks ({total_thinking_chars} chars)" | |
| ) | |
| converted_response = self.convert_anthropic_response_to_openai( | |
| anthropic_response, model_id | |
| ) | |
| logger.info("Response conversion complete") | |
| return converted_response | |
| except Exception as e: | |
| logger.error(f"Non-stream request error: {e}") | |
| return { | |
| "id": f"chatcmpl-{uuid4()}", | |
| "object": "chat.completion", | |
| "created": int(time.time()), | |
| "model": model_id, | |
| "choices": [ | |
| { | |
| "index": 0, | |
| "message": {"role": "assistant", "content": f"Error: {e}"}, | |
| "finish_reason": "stop", | |
| } | |
| ], | |
| "usage": { | |
| "prompt_tokens": 0, | |
| "completion_tokens": 0, | |
| "total_tokens": 0, | |
| }, | |
| } | |
| def pipe( | |
| self, body: Dict[str, Any] | |
| ) -> Union[str, Generator[str, None, None], APIResponse]: | |
| """Main entrypoint.""" | |
| model_name = body.get("model", "unknown") | |
| stream_mode = body.get("stream", "not specified") | |
| message_count = len(body.get("messages", [])) | |
| logger.info( | |
| f"Processing request: model={model_name}, stream={stream_mode}, messages={message_count}" | |
| ) | |
| try: | |
| # Extract and process messages | |
| system_message, messages = pop_system_message(body["messages"]) | |
| processed_messages, _ = self._process_messages(messages) | |
| # Extract model ID | |
| full_model = body["model"] | |
| model_id = ( | |
| full_model[full_model.find(".") + 1 :] | |
| if "." in full_model | |
| else full_model | |
| ) | |
| # Build payload and headers | |
| payload = self._build_payload( | |
| body, model_id, processed_messages, system_message | |
| ) | |
| headers = self._build_headers(model_id) | |
| url = f"{ANTHROPIC_API_BASE}/messages" | |
| thinking_enabled = payload.get("thinking") is not None | |
| logger.info( | |
| f"Request configured: model={model_id}, stream={payload.get('stream')}, " | |
| f"max_tokens={payload.get('max_tokens')}, thinking={thinking_enabled}" | |
| ) | |
| # Route to appropriate handler depending on whether we're streaming | |
| # or not. | |
| if payload.get("stream", False): | |
| return self._stream_response(url, headers, payload) | |
| return self._non_stream_response(url, headers, payload) | |
| except Exception as e: | |
| logger.error(f"Request processing error: {e}") | |
| return f"Error: {e}" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment