Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save LionsAd/ed81504e2663dcf33a3d2efc2f9a31f4 to your computer and use it in GitHub Desktop.
Save LionsAd/ed81504e2663dcf33a3d2efc2f9a31f4 to your computer and use it in GitHub Desktop.
anthropic_manifold_pipe--with-thinking-3-7-async-and-logging.py
"""
title: Anthropic Manifold Pipe
authors: justinh-rahb and christian-taillon
author_url: https://github.com/justinh-rahb
funding_url: https://github.com/open-webui
version: 0.2.4
required_open_webui_version: 0.3.17
license: MIT
"""
import os
import aiohttp
import json
import logging
from typing import List, Union, Generator, Iterator, AsyncGenerator
from pydantic import BaseModel, Field
from open_webui.utils.misc import pop_system_message
from open_webui.env import SRC_LOG_LEVELS
class Pipe:
class Valves(BaseModel):
ANTHROPIC_API_KEY: str = Field(default="")
# Mapping for reasoning effort to token budgets
REASONING_EFFORT_BUDGET_TOKEN_MAP = {
"none": None,
"low": 1024,
"medium": 4096,
"high": 16384,
"max": 32768,
}
# Maximum combined token limit for Claude 3.7 models
MAX_COMBINED_TOKENS = 64000
def __init__(self):
self.type = "manifold"
self.id = "anthropic"
self.name = "anthropic/"
self.valves = self.Valves(
**{"ANTHROPIC_API_KEY": os.getenv("ANTHROPIC_API_KEY", "")}
)
self.MAX_IMAGE_SIZE = 5 * 1024 * 1024 # 5MB per image
# Initialize logger
self.log = logging.getLogger("anthropic.pipe")
self.log.setLevel(SRC_LOG_LEVELS["OPENAI"])
pass
def get_anthropic_models(self):
"""Return list of available Anthropic models."""
return [
{"id": "claude-3-haiku-20240307", "name": "claude-3-haiku"},
{"id": "claude-3-opus-20240229", "name": "claude-3-opus"},
{"id": "claude-3-sonnet-20240229", "name": "claude-3-sonnet"},
{"id": "claude-3-5-haiku-20241022", "name": "claude-3.5-haiku"},
{"id": "claude-3-5-haiku-latest", "name": "claude-3.5-haiku"},
{"id": "claude-3-5-sonnet-20240620", "name": "claude-3.5-sonnet"},
{"id": "claude-3-5-sonnet-20241022", "name": "claude-3.5-sonnet"},
{"id": "claude-3-5-sonnet-latest", "name": "claude-3.5-sonnet"},
{"id": "claude-3-7-sonnet-20250219", "name": "claude-3.7-sonnet"},
]
def pipes(self) -> List[dict]:
"""Return list of pipe configurations."""
return self.get_anthropic_models()
async def process_image(self, image_data):
"""Process image data with size validation asynchronously."""
self.log.debug(f"Processing image: {image_data['image_url']['url'][:50]}...")
if image_data["image_url"]["url"].startswith("data:image"):
mime_type, base64_data = image_data["image_url"]["url"].split(",", 1)
media_type = mime_type.split(":")[1].split(";")[0]
# Check base64 image size
image_size = len(base64_data) * 3 / 4 # Convert base64 size to bytes
self.log.debug(f"Base64 image size: {image_size} bytes")
if image_size > self.MAX_IMAGE_SIZE:
raise ValueError(
f"Image size exceeds 5MB limit: {image_size / (1024 * 1024):.2f}MB"
)
return {
"type": "image",
"source": {
"type": "base64",
"media_type": media_type,
"data": base64_data,
},
}
else:
# For URL images, perform async size check
url = image_data["image_url"]["url"]
async with aiohttp.ClientSession() as session:
async with session.head(url, allow_redirects=True) as response:
content_length = int(response.headers.get("content-length", 0))
self.log.debug(f"URL image size: {content_length} bytes")
if content_length > self.MAX_IMAGE_SIZE:
raise ValueError(
f"Image at URL exceeds 5MB limit: {content_length / (1024 * 1024):.2f}MB"
)
return {
"type": "image",
"source": {"type": "url", "url": url},
}
async def pipe(self, body: dict) -> Union[str, AsyncGenerator]:
"""Process the input body and return a response."""
self.log.info("Starting pipe processing")
self.log.debug(f"Input body: {json.dumps(body, indent=2)}")
system_message, messages = pop_system_message(body["messages"])
processed_messages = []
total_image_size = 0
for message in messages:
processed_content = []
if isinstance(message.get("content"), list):
# Check number of images per message (Anthropic limit: 5 per message)
image_count = sum(
1 for item in message["content"] if item["type"] == "image_url"
)
self.log.debug(f"Image count in message: {image_count}")
if image_count > 5:
raise ValueError("Maximum of 5 images per message exceeded")
for item in message["content"]:
if item["type"] == "text":
processed_content.append({"type": "text", "text": item["text"]})
elif item["type"] == "image_url":
processed_image = await self.process_image(item)
processed_content.append(processed_image)
# Track total size for base64 images
if processed_image["source"]["type"] == "base64":
image_size = len(processed_image["source"]["data"]) * 3 / 4
total_image_size += image_size
self.log.debug(f"Total image size so far: {total_image_size} bytes")
if (
total_image_size > 100 * 1024 * 1024
): # 100MB total limit
raise ValueError(
"Total size of images exceeds 100 MB limit"
)
else:
processed_content = [
{"type": "text", "text": message.get("content", "")}
]
processed_messages.append(
{"role": message["role"], "content": processed_content}
)
payload = {
"model": body["model"][body["model"].find(".") + 1 :],
"messages": processed_messages,
"max_tokens": body.get("max_tokens", 4096),
"temperature": body.get("temperature", 0.8),
"top_k": body.get("top_k", 40),
"top_p": body.get("top_p", 0.9),
"stop_sequences": body.get("stop", []),
**({"system": str(system_message)} if system_message else {}),
"stream": body.get("stream", False),
}
self.log.debug(f"Request payload: {json.dumps(payload, indent=2)}")
headers = {
"x-api-key": self.valves.ANTHROPIC_API_KEY,
"anthropic-version": "2023-06-01",
"content-type": "application/json",
}
# Add thinking support for Claude 3.7 models
model_id = payload["model"]
supports_thinking = model_id.startswith("claude-3-7")
reasoning_effort = body.get("reasoning_effort", "none")
budget_tokens = self.REASONING_EFFORT_BUDGET_TOKEN_MAP.get(reasoning_effort)
# Allow integer values for reasoning_effort
if budget_tokens is None and reasoning_effort not in self.REASONING_EFFORT_BUDGET_TOKEN_MAP:
try:
budget_tokens = int(reasoning_effort)
except (ValueError, TypeError):
budget_tokens = None
if supports_thinking and budget_tokens:
max_tokens = payload.get("max_tokens", 4096)
combined_tokens = budget_tokens + max_tokens
self.log.debug(f"Thinking enabled: budget_tokens={budget_tokens}, max_tokens={max_tokens}")
if combined_tokens > self.MAX_COMBINED_TOKENS:
self.log.error(f"Combined tokens exceed limit: {combined_tokens} > {self.MAX_COMBINED_TOKENS}")
return f"Error: Combined tokens (budget_tokens {budget_tokens} + max_tokens {max_tokens} = {combined_tokens}) exceeds the maximum limit of {self.MAX_COMBINED_TOKENS}"
payload["max_tokens"] = combined_tokens
payload["thinking"] = {
"type": "enabled",
"budget_tokens": budget_tokens,
}
payload["temperature"] = 1.0
if "top_k" in payload:
del payload["top_k"]
if "top_p" in payload:
del payload["top_p"]
self.log.debug("Adjusted payload for thinking support")
url = "https://api.anthropic.com/v1/messages"
self.log.info(f"Sending request to {url} (stream={payload['stream']})")
if body.get("stream", False):
return self.stream_response(url, headers, payload)
else:
return await self.non_stream_response(url, headers, payload)
async def stream_response(self, url, headers, payload) -> AsyncGenerator[str, None]:
self.log.info("Starting streaming response")
try:
async with aiohttp.ClientSession() as session:
async with session.post(url, headers=headers, json=payload) as response:
if response.status != 200:
error_text = await response.text()
self.log.error(f"HTTP Error {response.status}: {error_text}")
yield f"Error: HTTP Error {response.status}: {error_text}"
return
async for line in response.content:
if line:
line = line.decode("utf-8")
if line.startswith("data: "):
try:
data = json.loads(line[6:])
self.log.debug(f"Received stream event: {data['type']}")
if data["type"] == "content_block_start":
if data["content_block"]["type"] == "thinking":
yield "<think>"
else:
yield data["content_block"]["text"]
elif data["type"] == "content_block_delta":
if data["delta"]["type"] == "thinking_delta":
yield data["delta"]["thinking"]
elif data["delta"]["type"] == "signature_delta":
yield "\n </think> \n\n"
else:
yield data["delta"]["text"]
elif data["type"] == "message_stop":
self.log.info("Stream completed")
break
elif data["type"] == "message":
for content in data.get("content", []):
if content["type"] == "text":
yield content["text"]
except json.JSONDecodeError:
self.log.error(f"Failed to parse JSON: {line}")
continue
except KeyError as e:
self.log.error(f"Unexpected data structure: {e}")
self.log.debug(f"Full data: {data}")
continue
except Exception as e:
self.log.exception(f"General error in stream_response method: {e}")
yield f"Error: {e}"
async def non_stream_response(self, url, headers, payload) -> str:
self.log.info("Starting non-streaming response")
try:
async with aiohttp.ClientSession() as session:
async with session.post(url, headers=headers, json=payload) as response:
if response.status != 200:
error_text = await response.text()
self.log.error(f"HTTP Error {response.status}: {error_text}")
raise Exception(f"HTTP Error {response.status}: {error_text}")
res = await response.json()
self.log.debug(f"Received response: {json.dumps(res, indent=2)}")
for content in res.get("content", []):
if content.get("type") == "text":
self.log.info("Non-streaming response completed")
return content["text"]
self.log.info("Non-streaming response completed (empty content)")
return ""
except Exception as e:
self.log.exception(f"Failed non-stream request: {e}")
return f"Error: {e}"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment