Last active
March 3, 2025 15:28
-
-
Save LionsAd/ed81504e2663dcf33a3d2efc2f9a31f4 to your computer and use it in GitHub Desktop.
anthropic_manifold_pipe--with-thinking-3-7-async-and-logging.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
title: Anthropic Manifold Pipe | |
authors: justinh-rahb and christian-taillon | |
author_url: https://github.com/justinh-rahb | |
funding_url: https://github.com/open-webui | |
version: 0.2.4 | |
required_open_webui_version: 0.3.17 | |
license: MIT | |
""" | |
import os | |
import aiohttp | |
import json | |
import logging | |
from typing import List, Union, Generator, Iterator, AsyncGenerator | |
from pydantic import BaseModel, Field | |
from open_webui.utils.misc import pop_system_message | |
from open_webui.env import SRC_LOG_LEVELS | |
class Pipe: | |
class Valves(BaseModel): | |
ANTHROPIC_API_KEY: str = Field(default="") | |
# Mapping for reasoning effort to token budgets | |
REASONING_EFFORT_BUDGET_TOKEN_MAP = { | |
"none": None, | |
"low": 1024, | |
"medium": 4096, | |
"high": 16384, | |
"max": 32768, | |
} | |
# Maximum combined token limit for Claude 3.7 models | |
MAX_COMBINED_TOKENS = 64000 | |
def __init__(self): | |
self.type = "manifold" | |
self.id = "anthropic" | |
self.name = "anthropic/" | |
self.valves = self.Valves( | |
**{"ANTHROPIC_API_KEY": os.getenv("ANTHROPIC_API_KEY", "")} | |
) | |
self.MAX_IMAGE_SIZE = 5 * 1024 * 1024 # 5MB per image | |
# Initialize logger | |
self.log = logging.getLogger("anthropic.pipe") | |
self.log.setLevel(SRC_LOG_LEVELS["OPENAI"]) | |
pass | |
def get_anthropic_models(self): | |
"""Return list of available Anthropic models.""" | |
return [ | |
{"id": "claude-3-haiku-20240307", "name": "claude-3-haiku"}, | |
{"id": "claude-3-opus-20240229", "name": "claude-3-opus"}, | |
{"id": "claude-3-sonnet-20240229", "name": "claude-3-sonnet"}, | |
{"id": "claude-3-5-haiku-20241022", "name": "claude-3.5-haiku"}, | |
{"id": "claude-3-5-haiku-latest", "name": "claude-3.5-haiku"}, | |
{"id": "claude-3-5-sonnet-20240620", "name": "claude-3.5-sonnet"}, | |
{"id": "claude-3-5-sonnet-20241022", "name": "claude-3.5-sonnet"}, | |
{"id": "claude-3-5-sonnet-latest", "name": "claude-3.5-sonnet"}, | |
{"id": "claude-3-7-sonnet-20250219", "name": "claude-3.7-sonnet"}, | |
] | |
def pipes(self) -> List[dict]: | |
"""Return list of pipe configurations.""" | |
return self.get_anthropic_models() | |
async def process_image(self, image_data): | |
"""Process image data with size validation asynchronously.""" | |
self.log.debug(f"Processing image: {image_data['image_url']['url'][:50]}...") | |
if image_data["image_url"]["url"].startswith("data:image"): | |
mime_type, base64_data = image_data["image_url"]["url"].split(",", 1) | |
media_type = mime_type.split(":")[1].split(";")[0] | |
# Check base64 image size | |
image_size = len(base64_data) * 3 / 4 # Convert base64 size to bytes | |
self.log.debug(f"Base64 image size: {image_size} bytes") | |
if image_size > self.MAX_IMAGE_SIZE: | |
raise ValueError( | |
f"Image size exceeds 5MB limit: {image_size / (1024 * 1024):.2f}MB" | |
) | |
return { | |
"type": "image", | |
"source": { | |
"type": "base64", | |
"media_type": media_type, | |
"data": base64_data, | |
}, | |
} | |
else: | |
# For URL images, perform async size check | |
url = image_data["image_url"]["url"] | |
async with aiohttp.ClientSession() as session: | |
async with session.head(url, allow_redirects=True) as response: | |
content_length = int(response.headers.get("content-length", 0)) | |
self.log.debug(f"URL image size: {content_length} bytes") | |
if content_length > self.MAX_IMAGE_SIZE: | |
raise ValueError( | |
f"Image at URL exceeds 5MB limit: {content_length / (1024 * 1024):.2f}MB" | |
) | |
return { | |
"type": "image", | |
"source": {"type": "url", "url": url}, | |
} | |
async def pipe(self, body: dict) -> Union[str, AsyncGenerator]: | |
"""Process the input body and return a response.""" | |
self.log.info("Starting pipe processing") | |
self.log.debug(f"Input body: {json.dumps(body, indent=2)}") | |
system_message, messages = pop_system_message(body["messages"]) | |
processed_messages = [] | |
total_image_size = 0 | |
for message in messages: | |
processed_content = [] | |
if isinstance(message.get("content"), list): | |
# Check number of images per message (Anthropic limit: 5 per message) | |
image_count = sum( | |
1 for item in message["content"] if item["type"] == "image_url" | |
) | |
self.log.debug(f"Image count in message: {image_count}") | |
if image_count > 5: | |
raise ValueError("Maximum of 5 images per message exceeded") | |
for item in message["content"]: | |
if item["type"] == "text": | |
processed_content.append({"type": "text", "text": item["text"]}) | |
elif item["type"] == "image_url": | |
processed_image = await self.process_image(item) | |
processed_content.append(processed_image) | |
# Track total size for base64 images | |
if processed_image["source"]["type"] == "base64": | |
image_size = len(processed_image["source"]["data"]) * 3 / 4 | |
total_image_size += image_size | |
self.log.debug(f"Total image size so far: {total_image_size} bytes") | |
if ( | |
total_image_size > 100 * 1024 * 1024 | |
): # 100MB total limit | |
raise ValueError( | |
"Total size of images exceeds 100 MB limit" | |
) | |
else: | |
processed_content = [ | |
{"type": "text", "text": message.get("content", "")} | |
] | |
processed_messages.append( | |
{"role": message["role"], "content": processed_content} | |
) | |
payload = { | |
"model": body["model"][body["model"].find(".") + 1 :], | |
"messages": processed_messages, | |
"max_tokens": body.get("max_tokens", 4096), | |
"temperature": body.get("temperature", 0.8), | |
"top_k": body.get("top_k", 40), | |
"top_p": body.get("top_p", 0.9), | |
"stop_sequences": body.get("stop", []), | |
**({"system": str(system_message)} if system_message else {}), | |
"stream": body.get("stream", False), | |
} | |
self.log.debug(f"Request payload: {json.dumps(payload, indent=2)}") | |
headers = { | |
"x-api-key": self.valves.ANTHROPIC_API_KEY, | |
"anthropic-version": "2023-06-01", | |
"content-type": "application/json", | |
} | |
# Add thinking support for Claude 3.7 models | |
model_id = payload["model"] | |
supports_thinking = model_id.startswith("claude-3-7") | |
reasoning_effort = body.get("reasoning_effort", "none") | |
budget_tokens = self.REASONING_EFFORT_BUDGET_TOKEN_MAP.get(reasoning_effort) | |
# Allow integer values for reasoning_effort | |
if budget_tokens is None and reasoning_effort not in self.REASONING_EFFORT_BUDGET_TOKEN_MAP: | |
try: | |
budget_tokens = int(reasoning_effort) | |
except (ValueError, TypeError): | |
budget_tokens = None | |
if supports_thinking and budget_tokens: | |
max_tokens = payload.get("max_tokens", 4096) | |
combined_tokens = budget_tokens + max_tokens | |
self.log.debug(f"Thinking enabled: budget_tokens={budget_tokens}, max_tokens={max_tokens}") | |
if combined_tokens > self.MAX_COMBINED_TOKENS: | |
self.log.error(f"Combined tokens exceed limit: {combined_tokens} > {self.MAX_COMBINED_TOKENS}") | |
return f"Error: Combined tokens (budget_tokens {budget_tokens} + max_tokens {max_tokens} = {combined_tokens}) exceeds the maximum limit of {self.MAX_COMBINED_TOKENS}" | |
payload["max_tokens"] = combined_tokens | |
payload["thinking"] = { | |
"type": "enabled", | |
"budget_tokens": budget_tokens, | |
} | |
payload["temperature"] = 1.0 | |
if "top_k" in payload: | |
del payload["top_k"] | |
if "top_p" in payload: | |
del payload["top_p"] | |
self.log.debug("Adjusted payload for thinking support") | |
url = "https://api.anthropic.com/v1/messages" | |
self.log.info(f"Sending request to {url} (stream={payload['stream']})") | |
if body.get("stream", False): | |
return self.stream_response(url, headers, payload) | |
else: | |
return await self.non_stream_response(url, headers, payload) | |
async def stream_response(self, url, headers, payload) -> AsyncGenerator[str, None]: | |
self.log.info("Starting streaming response") | |
try: | |
async with aiohttp.ClientSession() as session: | |
async with session.post(url, headers=headers, json=payload) as response: | |
if response.status != 200: | |
error_text = await response.text() | |
self.log.error(f"HTTP Error {response.status}: {error_text}") | |
yield f"Error: HTTP Error {response.status}: {error_text}" | |
return | |
async for line in response.content: | |
if line: | |
line = line.decode("utf-8") | |
if line.startswith("data: "): | |
try: | |
data = json.loads(line[6:]) | |
self.log.debug(f"Received stream event: {data['type']}") | |
if data["type"] == "content_block_start": | |
if data["content_block"]["type"] == "thinking": | |
yield "<think>" | |
else: | |
yield data["content_block"]["text"] | |
elif data["type"] == "content_block_delta": | |
if data["delta"]["type"] == "thinking_delta": | |
yield data["delta"]["thinking"] | |
elif data["delta"]["type"] == "signature_delta": | |
yield "\n </think> \n\n" | |
else: | |
yield data["delta"]["text"] | |
elif data["type"] == "message_stop": | |
self.log.info("Stream completed") | |
break | |
elif data["type"] == "message": | |
for content in data.get("content", []): | |
if content["type"] == "text": | |
yield content["text"] | |
except json.JSONDecodeError: | |
self.log.error(f"Failed to parse JSON: {line}") | |
continue | |
except KeyError as e: | |
self.log.error(f"Unexpected data structure: {e}") | |
self.log.debug(f"Full data: {data}") | |
continue | |
except Exception as e: | |
self.log.exception(f"General error in stream_response method: {e}") | |
yield f"Error: {e}" | |
async def non_stream_response(self, url, headers, payload) -> str: | |
self.log.info("Starting non-streaming response") | |
try: | |
async with aiohttp.ClientSession() as session: | |
async with session.post(url, headers=headers, json=payload) as response: | |
if response.status != 200: | |
error_text = await response.text() | |
self.log.error(f"HTTP Error {response.status}: {error_text}") | |
raise Exception(f"HTTP Error {response.status}: {error_text}") | |
res = await response.json() | |
self.log.debug(f"Received response: {json.dumps(res, indent=2)}") | |
for content in res.get("content", []): | |
if content.get("type") == "text": | |
self.log.info("Non-streaming response completed") | |
return content["text"] | |
self.log.info("Non-streaming response completed (empty content)") | |
return "" | |
except Exception as e: | |
self.log.exception(f"Failed non-stream request: {e}") | |
return f"Error: {e}" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment