Last active
February 28, 2025 23:09
-
-
Save LionsAd/3133ea4a70ca7c7cb3800afbab287220 to your computer and use it in GitHub Desktop.
anthropic_manifold_pipe--with-thinking-3-7.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/amp.py b/amp.py | |
index 2dfde0f..2de8bbd 100644 | |
--- a/amp.py | |
+++ b/amp.py | |
@@ -21,6 +21,18 @@ class Pipe: | |
class Valves(BaseModel): | |
ANTHROPIC_API_KEY: str = Field(default="") | |
+ # Mapping for reasoning effort to token budgets | |
+ REASONING_EFFORT_BUDGET_TOKEN_MAP = { | |
+ "none": None, | |
+ "low": 1024, | |
+ "medium": 4096, | |
+ "high": 16384, | |
+ "max": 32768, | |
+ } | |
+ | |
+ # Maximum combined token limit for Claude 3.7 models | |
+ MAX_COMBINED_TOKENS = 64000 | |
+ | |
def __init__(self): | |
self.type = "manifold" | |
self.id = "anthropic" | |
@@ -32,6 +44,7 @@ class Pipe: | |
pass | |
def get_anthropic_models(self): | |
+ """Return list of available Anthropic models.""" | |
return [ | |
{"id": "claude-3-haiku-20240307", "name": "claude-3-haiku"}, | |
{"id": "claude-3-opus-20240229", "name": "claude-3-opus"}, | |
@@ -45,6 +58,7 @@ class Pipe: | |
] | |
def pipes(self) -> List[dict]: | |
+ """Return list of pipe configurations.""" | |
return self.get_anthropic_models() | |
def process_image(self, image_data): | |
@@ -85,6 +99,7 @@ class Pipe: | |
} | |
def pipe(self, body: dict) -> Union[str, Generator, Iterator]: | |
+ """Process the input body and return a response.""" | |
system_message, messages = pop_system_message(body["messages"]) | |
processed_messages = [] | |
@@ -93,6 +108,10 @@ class Pipe: | |
for message in messages: | |
processed_content = [] | |
if isinstance(message.get("content"), list): | |
+ # Check number of images per message (Anthropic limit: 5 per message) | |
+ image_count = sum(1 for item in message["content"] if item["type"] == "image_url") | |
+ if image_count > 5: | |
+ raise ValueError("Maximum of 5 images per message exceeded") | |
for item in message["content"]: | |
if item["type"] == "text": | |
processed_content.append({"type": "text", "text": item["text"]}) | |
@@ -137,6 +156,35 @@ class Pipe: | |
"content-type": "application/json", | |
} | |
+ # Add thinking support for Claude 3.7 models | |
+ model_id = payload["model"] | |
+ supports_thinking = model_id.startswith("claude-3-7") | |
+ reasoning_effort = body.get("reasoning_effort", "none") | |
+ budget_tokens = self.REASONING_EFFORT_BUDGET_TOKEN_MAP.get(reasoning_effort) | |
+ | |
+ # Allow integer values for reasoning_effort | |
+ if budget_tokens is None and reasoning_effort not in self.REASONING_EFFORT_BUDGET_TOKEN_MAP: | |
+ try: | |
+ budget_tokens = int(reasoning_effort) | |
+ except (ValueError, TypeError): | |
+ budget_tokens = None | |
+ | |
+ if supports_thinking and budget_tokens: | |
+ max_tokens = payload.get("max_tokens", 4096) | |
+ combined_tokens = budget_tokens + max_tokens | |
+ if combined_tokens > self.MAX_COMBINED_TOKENS: | |
+ return f"Error: Combined tokens (budget_tokens {budget_tokens} + max_tokens {max_tokens} = {combined_tokens}) exceeds the maximum limit of {self.MAX_COMBINED_TOKENS}" | |
+ payload["max_tokens"] = combined_tokens | |
+ payload["thinking"] = { | |
+ "type": "enabled", | |
+ "budget_tokens": budget_tokens, | |
+ } | |
+ payload["temperature"] = 1.0 | |
+ if "top_k" in payload: | |
+ del payload["top_k"] | |
+ if "top_p" in payload: | |
+ del payload["top_p"] | |
+ | |
url = "https://api.anthropic.com/v1/messages" | |
try: | |
@@ -168,9 +216,17 @@ class Pipe: | |
try: | |
data = json.loads(line[6:]) | |
if data["type"] == "content_block_start": | |
- yield data["content_block"]["text"] | |
+ if data["content_block"]["type"] == "thinking": | |
+ yield "<think>" | |
+ else: | |
+ yield data["content_block"]["text"] | |
elif data["type"] == "content_block_delta": | |
- yield data["delta"]["text"] | |
+ if data["delta"]["type"] == "thinking_delta": | |
+ yield data["delta"]["thinking"] | |
+ elif data["delta"]["type"] == "signature_delta": | |
+ yield "\n </think> \n\n" | |
+ else: | |
+ yield data["delta"]["text"] | |
elif data["type"] == "message_stop": | |
break | |
elif data["type"] == "message": | |
@@ -195,6 +251,7 @@ class Pipe: | |
yield f"Error: {e}" | |
def non_stream_response(self, url, headers, payload): | |
+ """Get a non-streaming response from the Anthropic API.""" | |
try: | |
response = requests.post( | |
url, headers=headers, json=payload, timeout=(3.05, 60) | |
@@ -203,10 +260,10 @@ class Pipe: | |
raise Exception(f"HTTP Error {response.status_code}: {response.text}") | |
res = response.json() | |
- return ( | |
- res["content"][0]["text"] if "content" in res and res["content"] else "" | |
- ) | |
+ for content in res.get("content", []): | |
+ if content.get("type") == "text": | |
+ return content["text"] | |
+ return "" | |
except requests.exceptions.RequestException as e: | |
print(f"Failed non-stream request: {e}") | |
return f"Error: {e}" | |
- |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
title: Anthropic Manifold Pipe | |
authors: justinh-rahb and christian-taillon | |
author_url: https://github.com/justinh-rahb | |
funding_url: https://github.com/open-webui | |
version: 0.2.4 | |
required_open_webui_version: 0.3.17 | |
license: MIT | |
""" | |
import os | |
import requests | |
import json | |
import time | |
from typing import List, Union, Generator, Iterator | |
from pydantic import BaseModel, Field | |
from open_webui.utils.misc import pop_system_message | |
class Pipe: | |
class Valves(BaseModel): | |
ANTHROPIC_API_KEY: str = Field(default="") | |
# Mapping for reasoning effort to token budgets | |
REASONING_EFFORT_BUDGET_TOKEN_MAP = { | |
"none": None, | |
"low": 1024, | |
"medium": 4096, | |
"high": 16384, | |
"max": 32768, | |
} | |
# Maximum combined token limit for Claude 3.7 models | |
MAX_COMBINED_TOKENS = 64000 | |
def __init__(self): | |
self.type = "manifold" | |
self.id = "anthropic" | |
self.name = "anthropic/" | |
self.valves = self.Valves( | |
**{"ANTHROPIC_API_KEY": os.getenv("ANTHROPIC_API_KEY", "")} | |
) | |
self.MAX_IMAGE_SIZE = 5 * 1024 * 1024 # 5MB per image | |
pass | |
def get_anthropic_models(self): | |
"""Return list of available Anthropic models.""" | |
return [ | |
{"id": "claude-3-haiku-20240307", "name": "claude-3-haiku"}, | |
{"id": "claude-3-opus-20240229", "name": "claude-3-opus"}, | |
{"id": "claude-3-sonnet-20240229", "name": "claude-3-sonnet"}, | |
{"id": "claude-3-5-haiku-20241022", "name": "claude-3.5-haiku"}, | |
{"id": "claude-3-5-haiku-latest", "name": "claude-3.5-haiku"}, | |
{"id": "claude-3-5-sonnet-20240620", "name": "claude-3.5-sonnet"}, | |
{"id": "claude-3-5-sonnet-20241022", "name": "claude-3.5-sonnet"}, | |
{"id": "claude-3-5-sonnet-latest", "name": "claude-3.5-sonnet"}, | |
{"id": "claude-3-7-sonnet-20250219", "name": "claude-3.7-sonnet"}, | |
] | |
def pipes(self) -> List[dict]: | |
"""Return list of pipe configurations.""" | |
return self.get_anthropic_models() | |
def process_image(self, image_data): | |
"""Process image data with size validation.""" | |
if image_data["image_url"]["url"].startswith("data:image"): | |
mime_type, base64_data = image_data["image_url"]["url"].split(",", 1) | |
media_type = mime_type.split(":")[1].split(";")[0] | |
# Check base64 image size | |
image_size = len(base64_data) * 3 / 4 # Convert base64 size to bytes | |
if image_size > self.MAX_IMAGE_SIZE: | |
raise ValueError( | |
f"Image size exceeds 5MB limit: {image_size / (1024 * 1024):.2f}MB" | |
) | |
return { | |
"type": "image", | |
"source": { | |
"type": "base64", | |
"media_type": media_type, | |
"data": base64_data, | |
}, | |
} | |
else: | |
# For URL images, perform size check after fetching | |
url = image_data["image_url"]["url"] | |
response = requests.head(url, allow_redirects=True) | |
content_length = int(response.headers.get("content-length", 0)) | |
if content_length > self.MAX_IMAGE_SIZE: | |
raise ValueError( | |
f"Image at URL exceeds 5MB limit: {content_length / (1024 * 1024):.2f}MB" | |
) | |
return { | |
"type": "image", | |
"source": {"type": "url", "url": url}, | |
} | |
def pipe(self, body: dict) -> Union[str, Generator, Iterator]: | |
"""Process the input body and return a response.""" | |
system_message, messages = pop_system_message(body["messages"]) | |
processed_messages = [] | |
total_image_size = 0 | |
for message in messages: | |
processed_content = [] | |
if isinstance(message.get("content"), list): | |
# Check number of images per message (Anthropic limit: 5 per message) | |
image_count = sum(1 for item in message["content"] if item["type"] == "image_url") | |
if image_count > 5: | |
raise ValueError("Maximum of 5 images per message exceeded") | |
for item in message["content"]: | |
if item["type"] == "text": | |
processed_content.append({"type": "text", "text": item["text"]}) | |
elif item["type"] == "image_url": | |
processed_image = self.process_image(item) | |
processed_content.append(processed_image) | |
# Track total size for base64 images | |
if processed_image["source"]["type"] == "base64": | |
image_size = len(processed_image["source"]["data"]) * 3 / 4 | |
total_image_size += image_size | |
if ( | |
total_image_size > 100 * 1024 * 1024 | |
): # 100MB total limit | |
raise ValueError( | |
"Total size of images exceeds 100 MB limit" | |
) | |
else: | |
processed_content = [ | |
{"type": "text", "text": message.get("content", "")} | |
] | |
processed_messages.append( | |
{"role": message["role"], "content": processed_content} | |
) | |
payload = { | |
"model": body["model"][body["model"].find(".") + 1 :], | |
"messages": processed_messages, | |
"max_tokens": body.get("max_tokens", 4096), | |
"temperature": body.get("temperature", 0.8), | |
"top_k": body.get("top_k", 40), | |
"top_p": body.get("top_p", 0.9), | |
"stop_sequences": body.get("stop", []), | |
**({"system": str(system_message)} if system_message else {}), | |
"stream": body.get("stream", False), | |
} | |
headers = { | |
"x-api-key": self.valves.ANTHROPIC_API_KEY, | |
"anthropic-version": "2023-06-01", | |
"content-type": "application/json", | |
} | |
# Add thinking support for Claude 3.7 models | |
model_id = payload["model"] | |
supports_thinking = model_id.startswith("claude-3-7") | |
reasoning_effort = body.get("reasoning_effort", "none") | |
budget_tokens = self.REASONING_EFFORT_BUDGET_TOKEN_MAP.get(reasoning_effort) | |
# Allow integer values for reasoning_effort | |
if budget_tokens is None and reasoning_effort not in self.REASONING_EFFORT_BUDGET_TOKEN_MAP: | |
try: | |
budget_tokens = int(reasoning_effort) | |
except (ValueError, TypeError): | |
budget_tokens = None | |
if supports_thinking and budget_tokens: | |
max_tokens = payload.get("max_tokens", 4096) | |
combined_tokens = budget_tokens + max_tokens | |
if combined_tokens > self.MAX_COMBINED_TOKENS: | |
return f"Error: Combined tokens (budget_tokens {budget_tokens} + max_tokens {max_tokens} = {combined_tokens}) exceeds the maximum limit of {self.MAX_COMBINED_TOKENS}" | |
payload["max_tokens"] = combined_tokens | |
payload["thinking"] = { | |
"type": "enabled", | |
"budget_tokens": budget_tokens, | |
} | |
payload["temperature"] = 1.0 | |
if "top_k" in payload: | |
del payload["top_k"] | |
if "top_p" in payload: | |
del payload["top_p"] | |
url = "https://api.anthropic.com/v1/messages" | |
try: | |
if body.get("stream", False): | |
return self.stream_response(url, headers, payload) | |
else: | |
return self.non_stream_response(url, headers, payload) | |
except requests.exceptions.RequestException as e: | |
print(f"Request failed: {e}") | |
return f"Error: Request failed: {e}" | |
except Exception as e: | |
print(f"Error in pipe method: {e}") | |
return f"Error: {e}" | |
def stream_response(self, url, headers, payload): | |
try: | |
with requests.post( | |
url, headers=headers, json=payload, stream=True, timeout=(3.05, 60) | |
) as response: | |
if response.status_code != 200: | |
raise Exception( | |
f"HTTP Error {response.status_code}: {response.text}" | |
) | |
for line in response.iter_lines(): | |
if line: | |
line = line.decode("utf-8") | |
if line.startswith("data: "): | |
try: | |
data = json.loads(line[6:]) | |
if data["type"] == "content_block_start": | |
if data["content_block"]["type"] == "thinking": | |
yield "<think>" | |
else: | |
yield data["content_block"]["text"] | |
elif data["type"] == "content_block_delta": | |
if data["delta"]["type"] == "thinking_delta": | |
yield data["delta"]["thinking"] | |
elif data["delta"]["type"] == "signature_delta": | |
yield "\n </think> \n\n" | |
else: | |
yield data["delta"]["text"] | |
elif data["type"] == "message_stop": | |
break | |
elif data["type"] == "message": | |
for content in data.get("content", []): | |
if content["type"] == "text": | |
yield content["text"] | |
time.sleep( | |
0.01 | |
) # Delay to avoid overwhelming the client | |
except json.JSONDecodeError: | |
print(f"Failed to parse JSON: {line}") | |
except KeyError as e: | |
print(f"Unexpected data structure: {e}") | |
print(f"Full data: {data}") | |
except requests.exceptions.RequestException as e: | |
print(f"Request failed: {e}") | |
yield f"Error: Request failed: {e}" | |
except Exception as e: | |
print(f"General error in stream_response method: {e}") | |
yield f"Error: {e}" | |
def non_stream_response(self, url, headers, payload): | |
"""Get a non-streaming response from the Anthropic API.""" | |
try: | |
response = requests.post( | |
url, headers=headers, json=payload, timeout=(3.05, 60) | |
) | |
if response.status_code != 200: | |
raise Exception(f"HTTP Error {response.status_code}: {response.text}") | |
res = response.json() | |
for content in res.get("content", []): | |
if content.get("type") == "text": | |
return content["text"] | |
return "" | |
except requests.exceptions.RequestException as e: | |
print(f"Failed non-stream request: {e}") | |
return f"Error: {e}" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment