Last active
May 24, 2025 09:44
-
-
Save monotykamary/19d759dcb2ecf236db1c9c4cb659d210 to your computer and use it in GitHub Desktop.
Open WebUI Anthropic with Prompt Caching
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| title: Anthropic Manifold Pipe | |
| authors: monotykamary | |
| author_url: https://github.com/monotykamary | |
| funding_url: https://github.com/open-webui | |
| version: 0.2.6 | |
| required_open_webui_version: 0.3.17 | |
| license: MIT | |
| """ | |
| import os | |
| import requests | |
| import json | |
| import time | |
| from typing import List, Union, Generator, Iterator | |
| from pydantic import BaseModel, Field | |
| from open_webui.utils.misc import pop_system_message | |
| class Pipe: | |
| class Valves(BaseModel): | |
| ANTHROPIC_API_KEY: str = Field(default="") | |
| def __init__(self): | |
| self.type = "manifold" | |
| self.id = "anthropic" | |
| self.name = "anthropic/" | |
| self.valves = self.Valves( | |
| **{"ANTHROPIC_API_KEY": os.getenv("ANTHROPIC_API_KEY", "")} | |
| ) | |
| pass | |
| def get_anthropic_models(self): | |
| return [ | |
| {"id": "claude-3-haiku-20240307", "name": "claude-3-haiku"}, | |
| {"id": "claude-3-opus-20240229", "name": "claude-3-opus"}, | |
| {"id": "claude-3-sonnet-20240229", "name": "claude-3-sonnet"}, | |
| {"id": "claude-3-5-haiku-20241022", "name": "claude-3.5-haiku"}, | |
| {"id": "claude-3-5-sonnet-20240620", "name": "claude-3.5-sonnet"}, | |
| {"id": "claude-3-5-sonnet-20241022", "name": "claude-3.5-sonnet-v2"}, | |
| {"id": "claude-3-7-sonnet-20250219", "name": "claude-3.7-sonnet"}, | |
| {"id": "claude-sonnet-4-20250514", "name": "claude-sonnet-4"}, | |
| {"id": "claude-opus-4-20250514", "name": "claude-opus-4"}, | |
| ] | |
| def pipes(self) -> List[dict]: | |
| return self.get_anthropic_models() | |
| def process_image(self, image_data): | |
| if image_data["image_url"]["url"].startswith("data:image"): | |
| mime_type, base64_data = image_data["image_url"]["url"].split(",", 1) | |
| media_type = mime_type.split(":")[1].split(";")[0] | |
| return { | |
| "type": "image", | |
| "source": { | |
| "type": "base64", | |
| "media_type": media_type, | |
| "data": base64_data, | |
| }, | |
| } | |
| else: | |
| return { | |
| "type": "image", | |
| "source": {"type": "url", "url": image_data["image_url"]["url"]}, | |
| } | |
| def pipe(self, body: dict) -> Union[str, Generator, Iterator]: | |
| system_message, messages = pop_system_message(body["messages"]) | |
| processed_messages = [] | |
| image_count = 0 | |
| total_image_size = 0 | |
| # Find indices of last three user messages | |
| lastThreeUserMsgIndices = [ | |
| i for i, msg in enumerate(messages) if msg["role"] == "user" | |
| ][-3:] | |
| # Process system message | |
| processed_system = [] | |
| if system_message: | |
| processed_system.append( | |
| { | |
| "type": "text", | |
| "text": str(system_message), | |
| "cache_control": {"type": "ephemeral"}, | |
| } | |
| ) | |
| # Process user messages | |
| for i, message in enumerate(messages): | |
| processed_content = [] | |
| is_last_message = i == len(messages) - 1 | |
| if isinstance(message.get("content"), list): | |
| for item in message["content"]: | |
| if item["type"] == "text": | |
| text_content = {"type": "text", "text": item["text"]} | |
| if message["role"] == "user" and i in lastThreeUserMsgIndices: | |
| text_content["cache_control"] = {"type": "ephemeral"} | |
| processed_content.append(text_content) | |
| elif item["type"] == "image_url": | |
| if image_count >= 5: | |
| raise ValueError( | |
| "Maximum of 5 images per API call exceeded" | |
| ) | |
| processed_image = self.process_image(item) | |
| processed_content.append(processed_image) | |
| if processed_image["source"]["type"] == "base64": | |
| image_size = len(processed_image["source"]["data"]) * 3 / 4 | |
| else: | |
| image_size = 0 | |
| total_image_size += image_size | |
| if total_image_size > 100 * 1024 * 1024: | |
| raise ValueError( | |
| "Total size of images exceeds 100 MB limit" | |
| ) | |
| image_count += 1 | |
| else: | |
| text_content = {"type": "text", "text": message.get("content", "")} | |
| if message["role"] == "user" and i in lastThreeUserMsgIndices: | |
| text_content["cache_control"] = {"type": "ephemeral"} | |
| processed_content = [text_content] | |
| processed_messages.append( | |
| {"role": message["role"], "content": processed_content} | |
| ) | |
| payload = { | |
| "model": body["model"][body["model"].find(".") + 1 :], | |
| "messages": processed_messages, | |
| "max_tokens": body.get("max_tokens", 4096), | |
| "temperature": body.get("temperature", 0.8), | |
| "top_k": body.get("top_k", 40), | |
| "top_p": body.get("top_p", 0.9), | |
| "stop_sequences": body.get("stop", []), | |
| "system": processed_system, | |
| "stream": body.get("stream", False), | |
| } | |
| headers = { | |
| "x-api-key": self.valves.ANTHROPIC_API_KEY, | |
| "anthropic-version": "2023-06-01", | |
| "anthropic-beta": "prompt-caching-2024-07-31", | |
| "content-type": "application/json", | |
| } | |
| url = "https://api.anthropic.com/v1/messages" | |
| try: | |
| if body.get("stream", False): | |
| return self.stream_response(url, headers, payload) | |
| else: | |
| return self.non_stream_response(url, headers, payload) | |
| except requests.exceptions.RequestException as e: | |
| print(f"Request failed: {e}") | |
| return f"Error: Request failed: {e}" | |
| except Exception as e: | |
| print(f"Error in pipe method: {e}") | |
| return f"Error: {e}" | |
| def stream_response(self, url, headers, payload): | |
| try: | |
| with requests.post( | |
| url, headers=headers, json=payload, stream=True, timeout=(3.05, 60) | |
| ) as response: | |
| if response.status_code != 200: | |
| raise Exception( | |
| f"HTTP Error {response.status_code}: {response.text}" | |
| ) | |
| for line in response.iter_lines(): | |
| if line: | |
| line = line.decode("utf-8") | |
| if line.startswith("data: "): | |
| try: | |
| data = json.loads(line[6:]) | |
| if data["type"] == "content_block_start": | |
| yield data["content_block"]["text"] | |
| elif data["type"] == "content_block_delta": | |
| yield data["delta"]["text"] | |
| elif data["type"] == "message_stop": | |
| break | |
| elif data["type"] == "message": | |
| for content in data.get("content", []): | |
| if content["type"] == "text": | |
| yield content["text"] | |
| # Delay to avoid overwhelming the client | |
| time.sleep(0.01) | |
| except json.JSONDecodeError: | |
| print(f"Failed to parse JSON: {line}") | |
| except KeyError as e: | |
| print(f"Unexpected data structure: {e}") | |
| print(f"Full data: {data}") | |
| except requests.exceptions.RequestException as e: | |
| print(f"Request failed: {e}") | |
| yield f"Error: Request failed: {e}" | |
| except Exception as e: | |
| print(f"General error in stream_response method: {e}") | |
| yield f"Error: {e}" | |
| def non_stream_response(self, url, headers, payload): | |
| try: | |
| response = requests.post( | |
| url, headers=headers, json=payload, timeout=(3.05, 60) | |
| ) | |
| if response.status_code != 200: | |
| raise Exception(f"HTTP Error {response.status_code}: {response.text}") | |
| res = response.json() | |
| return ( | |
| res["content"][0]["text"] if "content" in res and res["content"] else "" | |
| ) | |
| except requests.exceptions.RequestException as e: | |
| print(f"Failed non-stream request: {e}") | |
| return f"Error: {e}" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| title: Anthropic Manifold Pipe | |
| authors: monotykamary | |
| author_url: https://github.com/monotykamary | |
| funding_url: https://github.com/open-webui | |
| version: 0.2.6 | |
| required_open_webui_version: 0.3.17 | |
| license: MIT | |
| """ | |
| import os | |
| import requests | |
| import json | |
| import time | |
| from typing import List, Union, Generator, Iterator | |
| from pydantic import BaseModel, Field | |
| from open_webui.utils.misc import pop_system_message | |
| class Pipe: | |
| class Valves(BaseModel): | |
| ANTHROPIC_API_KEY: str = Field(default="") | |
| def __init__(self): | |
| self.type = "manifold" | |
| self.id = "anthropic" | |
| self.name = "anthropic/" | |
| self.valves = self.Valves( | |
| **{"ANTHROPIC_API_KEY": os.getenv("ANTHROPIC_API_KEY", "")} | |
| ) | |
| pass | |
| def get_anthropic_models(self): | |
| return [ | |
| {"id": "claude-3-haiku-20240307", "name": "claude-3-haiku"}, | |
| {"id": "claude-3-opus-20240229", "name": "claude-3-opus"}, | |
| {"id": "claude-3-sonnet-20240229", "name": "claude-3-sonnet"}, | |
| {"id": "claude-3-5-haiku-20241022", "name": "claude-3.5-haiku"}, | |
| {"id": "claude-3-5-sonnet-20240620", "name": "claude-3.5-sonnet"}, | |
| {"id": "claude-3-5-sonnet-20241022", "name": "claude-3.5-sonnet-v2"}, | |
| {"id": "claude-3-7-sonnet-20250219", "name": "claude-3.7-sonnet"}, | |
| {"id": "claude-sonnet-4-20250514", "name": "claude-sonnet-4"}, | |
| {"id": "claude-opus-4-20250514", "name": "claude-opus-4"}, | |
| ] | |
| def pipes(self) -> List[dict]: | |
| return self.get_anthropic_models() | |
| def process_image(self, image_data): | |
| if image_data["image_url"]["url"].startswith("data:image"): | |
| mime_type, base64_data = image_data["image_url"]["url"].split(",", 1) | |
| media_type = mime_type.split(":")[1].split(";")[0] | |
| return { | |
| "type": "image", | |
| "source": { | |
| "type": "base64", | |
| "media_type": media_type, | |
| "data": base64_data, | |
| }, | |
| } | |
| else: | |
| return { | |
| "type": "image", | |
| "source": {"type": "url", "url": image_data["image_url"]["url"]}, | |
| } | |
| def pipe(self, body: dict) -> Union[str, Generator, Iterator]: | |
| system_message, messages = pop_system_message(body["messages"]) | |
| processed_messages = [] | |
| image_count = 0 | |
| total_image_size = 0 | |
| # Find indices of last two user messages | |
| lastTwoUserMsgIndices = [ | |
| i for i, msg in enumerate(messages) if msg["role"] == "user" | |
| ][-2:] | |
| # Process system message | |
| processed_system = [] | |
| if system_message: | |
| processed_system.append({ | |
| "type": "text", | |
| "text": str(system_message), | |
| "cache_control": {"type": "ephemeral"} | |
| }) | |
| # Process user messages | |
| for i, message in enumerate(messages): | |
| processed_content = [] | |
| is_last_message = i == len(messages) - 1 | |
| if isinstance(message.get("content"), list): | |
| for item in message["content"]: | |
| if item["type"] == "text": | |
| text_content = {"type": "text", "text": item["text"]} | |
| if message["role"] == "user" and i in lastTwoUserMsgIndices: | |
| text_content["cache_control"] = {"type": "ephemeral"} | |
| processed_content.append(text_content) | |
| elif item["type"] == "image_url": | |
| if image_count >= 5: | |
| raise ValueError("Maximum of 5 images per API call exceeded") | |
| processed_image = self.process_image(item) | |
| processed_content.append(processed_image) | |
| if processed_image["source"]["type"] == "base64": | |
| image_size = len(processed_image["source"]["data"]) * 3 / 4 | |
| else: | |
| image_size = 0 | |
| total_image_size += image_size | |
| if total_image_size > 100 * 1024 * 1024: | |
| raise ValueError("Total size of images exceeds 100 MB limit") | |
| image_count += 1 | |
| else: | |
| text_content = {"type": "text", "text": message.get("content", "")} | |
| if message["role"] == "user" and i in lastTwoUserMsgIndices: | |
| text_content["cache_control"] = {"type": "ephemeral"} | |
| processed_content = [text_content] | |
| processed_messages.append({ | |
| "role": message["role"], | |
| "content": processed_content | |
| }) | |
| payload = { | |
| "model": body["model"][body["model"].find(".") + 1 :], | |
| "messages": processed_messages, | |
| "max_tokens": body.get("max_tokens", 4096), | |
| "temperature": body.get("temperature", 0.8), | |
| "top_k": body.get("top_k", 40), | |
| "top_p": body.get("top_p", 0.9), | |
| "stop_sequences": body.get("stop", []), | |
| "system": processed_system, | |
| "stream": body.get("stream", False), | |
| } | |
| headers = { | |
| "x-api-key": self.valves.ANTHROPIC_API_KEY, | |
| "anthropic-version": "2023-06-01", | |
| "anthropic-beta": "prompt-caching-2024-07-31", | |
| "content-type": "application/json", | |
| } | |
| url = "https://api.anthropic.com/v1/messages" | |
| try: | |
| if body.get("stream", False): | |
| return self.stream_response(url, headers, payload) | |
| else: | |
| return self.non_stream_response(url, headers, payload) | |
| except requests.exceptions.RequestException as e: | |
| print(f"Request failed: {e}") | |
| return f"Error: Request failed: {e}" | |
| except Exception as e: | |
| print(f"Error in pipe method: {e}") | |
| return f"Error: {e}" | |
| def stream_response(self, url, headers, payload): | |
| try: | |
| with requests.post( | |
| url, headers=headers, json=payload, stream=True, timeout=(3.05, 60) | |
| ) as response: | |
| if response.status_code != 200: | |
| raise Exception( | |
| f"HTTP Error {response.status_code}: {response.text}" | |
| ) | |
| for line in response.iter_lines(): | |
| if line: | |
| line = line.decode("utf-8") | |
| if line.startswith("data: "): | |
| try: | |
| data = json.loads(line[6:]) | |
| if data["type"] == "content_block_start": | |
| yield data["content_block"]["text"] | |
| elif data["type"] == "content_block_delta": | |
| yield data["delta"]["text"] | |
| elif data["type"] == "message_stop": | |
| break | |
| elif data["type"] == "message": | |
| for content in data.get("content", []): | |
| if content["type"] == "text": | |
| yield content["text"] | |
| # Delay to avoid overwhelming the client | |
| time.sleep(0.01) | |
| except json.JSONDecodeError: | |
| print(f"Failed to parse JSON: {line}") | |
| except KeyError as e: | |
| print(f"Unexpected data structure: {e}") | |
| print(f"Full data: {data}") | |
| except requests.exceptions.RequestException as e: | |
| print(f"Request failed: {e}") | |
| yield f"Error: Request failed: {e}" | |
| except Exception as e: | |
| print(f"General error in stream_response method: {e}") | |
| yield f"Error: {e}" | |
| def non_stream_response(self, url, headers, payload): | |
| try: | |
| response = requests.post( | |
| url, headers=headers, json=payload, timeout=(3.05, 60) | |
| ) | |
| if response.status_code != 200: | |
| raise Exception(f"HTTP Error {response.status_code}: {response.text}") | |
| res = response.json() | |
| return ( | |
| res["content"][0]["text"] if "content" in res and res["content"] else "" | |
| ) | |
| except requests.exceptions.RequestException as e: | |
| print(f"Failed non-stream request: {e}") | |
| return f"Error: {e}" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| title: Anthropic Manifold Pipe | |
| authors: monotykamary | |
| author_url: https://github.com/monotykamary | |
| funding_url: https://github.com/open-webui | |
| version: 0.2.6 | |
| required_open_webui_version: 0.3.17 | |
| license: MIT | |
| """ | |
| import os | |
| import requests | |
| import json | |
| import time | |
| from typing import List, Union, Generator, Iterator | |
| from pydantic import BaseModel, Field | |
| from open_webui.utils.misc import pop_system_message | |
| def estimate_tokens(text: str) -> int: | |
| # A simple estimation: roughly 4 characters per token | |
| return len(text) // 4 | |
| class Pipe: | |
| class Valves(BaseModel): | |
| ANTHROPIC_API_KEY: str = Field(default="") | |
| def __init__(self): | |
| self.type = "manifold" | |
| self.id = "anthropic" | |
| self.name = "anthropic/" | |
| self.valves = self.Valves( | |
| **{"ANTHROPIC_API_KEY": os.getenv("ANTHROPIC_API_KEY", "")} | |
| ) | |
| pass | |
| def get_anthropic_models(self): | |
| return [ | |
| {"id": "claude-3-haiku-20240307", "name": "claude-3-haiku"}, | |
| {"id": "claude-3-opus-20240229", "name": "claude-3-opus"}, | |
| {"id": "claude-3-sonnet-20240229", "name": "claude-3-sonnet"}, | |
| {"id": "claude-3-5-haiku-20241022", "name": "claude-3.5-haiku"}, | |
| {"id": "claude-3-5-sonnet-20240620", "name": "claude-3.5-sonnet"}, | |
| {"id": "claude-3-5-sonnet-20241022", "name": "claude-3.5-sonnet-v2"}, | |
| {"id": "claude-3-7-sonnet-20250219", "name": "claude-3.7-sonnet"}, | |
| {"id": "claude-sonnet-4-20250514", "name": "claude-sonnet-4"}, | |
| {"id": "claude-opus-4-20250514", "name": "claude-opus-4"}, | |
| ] | |
| def pipes(self) -> List[dict]: | |
| return self.get_anthropic_models() | |
| def process_image(self, image_data): | |
| if image_data["image_url"]["url"].startswith("data:image"): | |
| mime_type, base64_data = image_data["image_url"]["url"].split(",", 1) | |
| media_type = mime_type.split(":")[1].split(";")[0] | |
| return { | |
| "type": "image", | |
| "source": { | |
| "type": "base64", | |
| "media_type": media_type, | |
| "data": base64_data, | |
| }, | |
| } | |
| else: | |
| return { | |
| "type": "image", | |
| "source": {"type": "url", "url": image_data["image_url"]["url"]}, | |
| } | |
| def pipe(self, body: dict) -> Union[str, Generator, Iterator]: | |
| system_message, messages = pop_system_message(body["messages"]) | |
| processed_messages = [] | |
| image_count = 0 | |
| total_image_size = 0 | |
| cache_control_count = 0 # Add counter for cache control blocks | |
| # Process system message | |
| processed_system = [] | |
| if system_message: | |
| processed_system.append({ | |
| "type": "text", | |
| "text": str(system_message), | |
| "cache_control": {"type": "ephemeral"} | |
| }) | |
| cache_control_count += 1 # Increment counter for system message | |
| # Estimate tokens for each message, excluding the last one | |
| message_sizes = [ | |
| (i, estimate_tokens(str(m.get("content", "")))) | |
| for i, m in enumerate(messages[:-1]) | |
| ] | |
| message_sizes.sort(key=lambda x: x[1], reverse=True) | |
| # Select top 3 largest messages for caching | |
| to_cache = set(x[0] for x in message_sizes[:3]) | |
| # Process user messages | |
| for i, message in enumerate(messages): | |
| processed_content = [] | |
| is_last_message = i == len(messages) - 1 | |
| if isinstance(message.get("content"), list): | |
| for item in message["content"]: | |
| if item["type"] == "text": | |
| text_content = {"type": "text", "text": item["text"]} | |
| # Only add cache_control if we haven't reached the limit | |
| if not is_last_message and i in to_cache and cache_control_count < 4: | |
| text_content["cache_control"] = {"type": "ephemeral"} | |
| cache_control_count += 1 | |
| processed_content.append(text_content) | |
| elif item["type"] == "image_url": | |
| if image_count >= 5: | |
| raise ValueError("Maximum of 5 images per API call exceeded") | |
| processed_image = self.process_image(item) | |
| processed_content.append(processed_image) | |
| if processed_image["source"]["type"] == "base64": | |
| image_size = len(processed_image["source"]["data"]) * 3 / 4 | |
| else: | |
| image_size = 0 | |
| total_image_size += image_size | |
| if total_image_size > 100 * 1024 * 1024: | |
| raise ValueError("Total size of images exceeds 100 MB limit") | |
| image_count += 1 | |
| else: | |
| text_content = {"type": "text", "text": message.get("content", "")} | |
| if not is_last_message and i in to_cache and cache_control_count < 4: | |
| text_content["cache_control"] = {"type": "ephemeral"} | |
| cache_control_count += 1 | |
| processed_content = [text_content] | |
| processed_messages.append({ | |
| "role": message["role"], | |
| "content": processed_content | |
| }) | |
| payload = { | |
| "model": body["model"][body["model"].find(".") + 1 :], | |
| "messages": processed_messages, | |
| "max_tokens": body.get("max_tokens", 4096), | |
| "temperature": body.get("temperature", 0.8), | |
| "top_k": body.get("top_k", 40), | |
| "top_p": body.get("top_p", 0.9), | |
| "stop_sequences": body.get("stop", []), | |
| "system": processed_system, | |
| "stream": body.get("stream", False), | |
| } | |
| headers = { | |
| "x-api-key": self.valves.ANTHROPIC_API_KEY, | |
| "anthropic-version": "2023-06-01", | |
| "anthropic-beta": "prompt-caching-2024-07-31", | |
| "content-type": "application/json", | |
| } | |
| url = "https://api.anthropic.com/v1/messages" | |
| try: | |
| if body.get("stream", False): | |
| return self.stream_response(url, headers, payload) | |
| else: | |
| return self.non_stream_response(url, headers, payload) | |
| except requests.exceptions.RequestException as e: | |
| print(f"Request failed: {e}") | |
| return f"Error: Request failed: {e}" | |
| except Exception as e: | |
| print(f"Error in pipe method: {e}") | |
| return f"Error: {e}" | |
| def stream_response(self, url, headers, payload): | |
| try: | |
| with requests.post( | |
| url, headers=headers, json=payload, stream=True, timeout=(3.05, 60) | |
| ) as response: | |
| if response.status_code != 200: | |
| raise Exception( | |
| f"HTTP Error {response.status_code}: {response.text}" | |
| ) | |
| for line in response.iter_lines(): | |
| if line: | |
| line = line.decode("utf-8") | |
| if line.startswith("data: "): | |
| try: | |
| data = json.loads(line[6:]) | |
| if data["type"] == "content_block_start": | |
| yield data["content_block"]["text"] | |
| elif data["type"] == "content_block_delta": | |
| yield data["delta"]["text"] | |
| elif data["type"] == "message_stop": | |
| break | |
| elif data["type"] == "message": | |
| for content in data.get("content", []): | |
| if content["type"] == "text": | |
| yield content["text"] | |
| # Delay to avoid overwhelming the client | |
| time.sleep(0.01) | |
| except json.JSONDecodeError: | |
| print(f"Failed to parse JSON: {line}") | |
| except KeyError as e: | |
| print(f"Unexpected data structure: {e}") | |
| print(f"Full data: {data}") | |
| except requests.exceptions.RequestException as e: | |
| print(f"Request failed: {e}") | |
| yield f"Error: Request failed: {e}" | |
| except Exception as e: | |
| print(f"General error in stream_response method: {e}") | |
| yield f"Error: {e}" | |
| def non_stream_response(self, url, headers, payload): | |
| try: | |
| response = requests.post( | |
| url, headers=headers, json=payload, timeout=(3.05, 60) | |
| ) | |
| if response.status_code != 200: | |
| raise Exception(f"HTTP Error {response.status_code}: {response.text}") | |
| res = response.json() | |
| return ( | |
| res["content"][0]["text"] if "content" in res and res["content"] else "" | |
| ) | |
| except requests.exceptions.RequestException as e: | |
| print(f"Failed non-stream request: {e}") | |
| return f"Error: {e}" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment