Last active
October 22, 2024 18:43
-
-
Save matthewhand/7ed94b3488aa652e4f44b2f1237d3c35 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| title: Autogen Action | |
| author: matthewh | |
| version: 2.0 | |
| required_open_webui_version: 0.3.9 | |
| Instructions: | |
| Install and run the UI like this: | |
| pip install autogenstudio flaml[automl] matplotlib | |
| autogenstudio ui --port=8081 --docs --host 0.0.0.0 --workers=2 | |
| Browse to http://localhost:8081/ and construct your model/agent/workflow. | |
| I am using LM Studio with Qwen-2.5. | |
| Then export the workflow and host it as an endpoint | |
| (**EXPORT! DO NOT DOWNLOAD** — this had me stumped for an hour): | |
| autogenstudio serve --workflow=workflow.json --port=8082 --host 0.0.0.0 --docs --workers=2 | |
| Browse to http://localhost:8082/docs, and you can infer multi-agent goodness. | |
| """ | |
| import os | |
| import aiohttp | |
| import asyncio | |
| import urllib.parse | |
| import time | |
| from pydantic import BaseModel, Field | |
| from typing import Optional, Callable, Awaitable, Dict, Any, List | |
| class Action: | |
| class Valves(BaseModel): | |
| AUTOGEN_BASE_URL: str = Field( | |
| default=os.getenv( | |
| "AUTOGEN_BASE_URL", "http://host.docker.internal:8082/predict" | |
| ), | |
| description="Base URL for the AutoGen endpoint.", | |
| ) | |
| emit_interval: float = Field( | |
| default=1.0, description="Interval in seconds between status emissions." | |
| ) | |
| enable_status_indicator: bool = Field( | |
| default=True, description="Enable or disable status indicator emissions." | |
| ) | |
| request_timeout: int = Field( | |
| default=300, description="Timeout for the HTTP client session in seconds." | |
| ) | |
| debug: bool = Field( | |
| default=False, description="Enable or disable debug logging." | |
| ) | |
| overwrite: bool = Field( | |
| default=True, | |
| description="If True, replaces the assistant's message; if False, amends the new response using event emitter.", | |
| ) | |
| def __init__(self): | |
| self.valves = self.Valves() | |
| self.stop_emitter = asyncio.Event() | |
| if self.valves.debug: | |
| print(f"Initialized with endpoint: {self.valves.AUTOGEN_BASE_URL}") | |
| async def action( | |
| self, | |
| body: dict, | |
| __user__: Optional[dict] = None, | |
| __event_emitter__: Optional[Callable[[dict], Awaitable[None]]] = None, | |
| ) -> Optional[dict]: | |
| if self.valves.debug: | |
| print("Starting action with body:", body) | |
| status_task = None | |
| if __event_emitter__ and self.valves.enable_status_indicator: | |
| self.stop_emitter.clear() | |
| status_task = asyncio.create_task( | |
| self.emit_periodic_status( | |
| __event_emitter__, | |
| "Autogen... Processing request", | |
| self.valves.emit_interval, | |
| ) | |
| ) | |
| try: | |
| messages = body.get("messages", []) | |
| if not messages: | |
| if self.valves.debug: | |
| print("No messages found in the request body.") | |
| return {"error": "No messages found."} | |
| # Get the most recent user message | |
| prompt = self.get_most_recent_user_message(messages) | |
| if not prompt: | |
| return {"error": "No valid user message found."} | |
| if self.valves.debug: | |
| print(f"Prompt extracted: {prompt}") | |
| encoded_prompt = self.clean_prompt(prompt) | |
| if self.valves.debug: | |
| print(f"Encoded prompt: {encoded_prompt}") | |
| start_time = time.time() | |
| await self.emit_status( | |
| __event_emitter__, | |
| "info", | |
| "Waiting for response from AutoGen endpoint.", | |
| False, | |
| ) | |
| response = await self.autogen_process(encoded_prompt, __event_emitter__) | |
| if self.valves.debug: | |
| print(f"Initial response from endpoint: {response}") | |
| if response.startswith("Error:"): | |
| await self.emit_status(__event_emitter__, "error", response, True) | |
| return {"error": response} | |
| if self.valves.overwrite: | |
| # Overwrite mode: Replace the last assistant message, or append if none exists | |
| found = False | |
| for message in reversed(messages): | |
| if message.get("role") == "assistant": | |
| message["content"] = response | |
| found = True | |
| if self.valves.debug: | |
| print( | |
| "Overwritten existing assistant message (overwrite mode)." | |
| ) | |
| break | |
| if not found: | |
| # If no existing assistant message, append a new one | |
| messages.append({"role": "assistant", "content": response}) | |
| if self.valves.debug: | |
| print("Appended new assistant message (overwrite mode).") | |
| elapsed_time = time.time() - start_time | |
| if self.valves.debug: | |
| print(f"Final response being returned: {body}") | |
| await self.emit_status( | |
| __event_emitter__, | |
| "info", | |
| f"Autogen process completed successfully. (elapsed: {elapsed_time:.1f}s)", | |
| True, | |
| ) | |
| # Emit 'message_update' event to notify the UI of the updated messages | |
| if __event_emitter__: | |
| await __event_emitter__( | |
| { | |
| "type": "message_update", | |
| "data": { | |
| "messages": messages, | |
| "updated_message": messages[-1], | |
| }, | |
| } | |
| ) | |
| # Return the modified body to ensure updated messages are included | |
| return body | |
| else: | |
| # Amend mode: Emit a new message via event emitter and do NOT return the body | |
| if __event_emitter__: | |
| await __event_emitter__( | |
| { | |
| "type": "message", # Using 'message' type to append | |
| "data": {"content": response}, | |
| # No 'done' condition needed | |
| } | |
| ) | |
| if self.valves.debug: | |
| print("Emitted new message via event emitter (amend mode).") | |
| else: | |
| # Fallback: Append to messages if event emitter is not available | |
| messages.append({"role": "assistant", "content": response}) | |
| if self.valves.debug: | |
| print( | |
| "Appended new assistant message via fallback (amend mode)." | |
| ) | |
| elapsed_time = time.time() - start_time | |
| if self.valves.debug: | |
| print(f"Final response being returned: {body}") | |
| await self.emit_status( | |
| __event_emitter__, | |
| "info", | |
| f"Autogen process completed successfully. (elapsed: {elapsed_time:.1f}s)", | |
| True, | |
| ) | |
| # In amend mode, do not return the body to prevent UI from resetting messages | |
| return None | |
| finally: | |
| if status_task: | |
| self.stop_emitter.set() | |
| await status_task | |
| def get_most_recent_user_message( | |
| self, messages: List[Dict[str, Any]] | |
| ) -> Optional[str]: | |
| """Retrieve the most recent user message from the list of messages.""" | |
| for message in reversed(messages): | |
| if message.get("role") == "user" and "content" in message: | |
| return message["content"] | |
| return None | |
| def clean_prompt(self, prompt: str) -> str: | |
| escaped_prompt = urllib.parse.quote(prompt, safe="") | |
| if self.valves.debug: | |
| print(f"Cleaned prompt: {escaped_prompt}") | |
| return escaped_prompt | |
| async def autogen_process( | |
| self, | |
| prompt: str, | |
| __event_emitter__: Optional[Callable[[dict], Awaitable[None]]] = None, | |
| ) -> str: | |
| base_url = self.valves.AUTOGEN_BASE_URL | |
| timeout = aiohttp.ClientTimeout(total=self.valves.request_timeout) | |
| url_with_prompt = f"{base_url}/{prompt}" | |
| headers = {"Accept": "application/json"} | |
| if self.valves.debug: | |
| print( | |
| f"Sending request to {url_with_prompt} with timeout {self.valves.request_timeout}" | |
| ) | |
| try: | |
| async with aiohttp.ClientSession(timeout=timeout) as session: | |
| async with session.get(url_with_prompt, headers=headers) as response: | |
| if self.valves.debug: | |
| print(f"Received status code: {response.status}") | |
| response.raise_for_status() | |
| data = await response.json() | |
| # Handle different response structures | |
| if self.valves.debug: | |
| print(f"Received data: {data}") | |
| if isinstance(data, dict): | |
| if "response" in data: | |
| return data["response"] | |
| elif "message" in data and "content" in data["message"]: | |
| return data["message"]["content"] | |
| elif ( | |
| "meta" in data.get("data", {}) | |
| and "messages" in data["data"]["meta"] | |
| ): | |
| # Get the second to last message content instead of the last | |
| messages = data["data"]["meta"]["messages"] | |
| for message in reversed(messages[:-1]): | |
| if ( | |
| isinstance(message, dict) | |
| and "message" in message | |
| and "content" in message["message"] | |
| ): | |
| return message["message"]["content"] | |
| return "No valid response content found." | |
| else: | |
| return "Unexpected response format." | |
| except Exception as e: | |
| error_message = f"Error during request: {str(e)}" | |
| if self.valves.debug: | |
| print(error_message) | |
| await self.emit_status(__event_emitter__, "error", error_message, True) | |
| return f"Error: {str(e)}" | |
| async def emit_periodic_status( | |
| self, | |
| __event_emitter__: Callable[[dict], Awaitable[None]], | |
| message: str, | |
| interval: float, | |
| ): | |
| start_time = time.time() | |
| try: | |
| while not self.stop_emitter.is_set(): | |
| elapsed_time = time.time() - start_time | |
| await self.emit_status( | |
| __event_emitter__, | |
| "info", | |
| f"{message} (elapsed: {elapsed_time:.1f}s)", | |
| False, | |
| ) | |
| await asyncio.sleep(interval) | |
| except asyncio.CancelledError: | |
| if self.valves.debug: | |
| print("Periodic status emitter cancelled.") | |
| async def emit_status( | |
| self, | |
| __event_emitter__: Callable[[dict], Awaitable[None]], | |
| level: str, | |
| message: str, | |
| done: bool, | |
| ): | |
| if __event_emitter__: | |
| event = { | |
| "type": "status", | |
| "data": { | |
| "status": "complete" if done else "in_progress", | |
| "level": level, | |
| "description": message, | |
| "done": done, | |
| }, | |
| } | |
| if self.valves.debug: | |
| print(f"Emitting status event: {event}") | |
| await __event_emitter__(event) | |
| async def on_start(self): | |
| if self.valves.debug: | |
| print("Autogen Action started") | |
| async def on_stop(self): | |
| if self.valves.debug: | |
| print("Autogen Action stopped") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| title: Autogen Pipeline | |
| author: matthewh | |
| version: 1.4 | |
| required_open_webui_version: 0.3.9 | |
| Instructions: | |
| 1. Install and run the UI: | |
| pip install autogenstudio flaml[automl] matplotlib | |
| 2. Start the UI: | |
| autogenstudio ui --port=8081 --docs --host=0.0.0.0 --workers=2 | |
| 3. Construct your model/agent/workflow at: | |
| http://localhost:8081/ | |
| 4. Export the workflow as an endpoint: | |
| autogenstudio serve --workflow=workflow.json --port=8082 --host=0.0.0.0 --docs --workers=2 | |
| 5. Use the endpoint at: | |
| http://localhost:8082/docs | |
| """ | |
| from typing import Optional, Callable, Awaitable, Dict, Any, List, Union | |
| import aiohttp | |
| import json | |
| import urllib.parse | |
| import time | |
| import asyncio | |
| from pydantic import BaseModel, Field | |
| class Pipe: | |
| """Pipeline for managing interactions with the AutoGen endpoint.""" | |
| class Valves(BaseModel): | |
| AUTOGEN_BASE_URL: str = Field( | |
| default="http://host.docker.internal:8082/predict", | |
| description="Base URL for the AutoGen endpoint.", | |
| ) | |
| emit_interval: float = Field( | |
| default=1.0, description="Interval in seconds between status emissions." | |
| ) | |
| enable_status_indicator: bool = Field( | |
| default=True, description="Enable or disable status indicator emissions." | |
| ) | |
| request_timeout: int = Field( | |
| default=300, description="Timeout for the HTTP client session in seconds." | |
| ) | |
| debug: bool = Field( | |
| default=False, description="Enable or disable debug logging." | |
| ) | |
| history_length: int = Field( | |
| default=1, | |
| description="Number of messages to include from the chat history, starting from the most recent. Default is 1, which includes the user's message and the previous assistant message if available.", | |
| ) | |
| def __init__(self): | |
| self.valves = self.Valves() | |
| self.stop_emitter = asyncio.Event() | |
| async def emit_periodic_status( | |
| self, | |
| __event_emitter__: Callable[[dict], Awaitable[None]], | |
| message: str, | |
| interval: float, | |
| ): | |
| """Emit status updates periodically until the stop event is set.""" | |
| start_time = time.time() | |
| try: | |
| while not self.stop_emitter.is_set(): | |
| elapsed_time = time.time() - start_time | |
| await self.emit_status( | |
| __event_emitter__, | |
| "info", | |
| f"{message} (elapsed: {elapsed_time:.1f}s)", | |
| False, | |
| ) | |
| await asyncio.sleep(interval) | |
| except asyncio.CancelledError: | |
| if self.valves.debug: | |
| print("[DEBUG] Periodic status emitter cancelled.") | |
| async def pipe( | |
| self, | |
| body: dict, | |
| __user__: Optional[dict] = None, | |
| __event_emitter__: Callable[[dict], Awaitable[None]] = None, | |
| ) -> Optional[Dict[str, Any]]: | |
| """Main handler for the AutoGen API interactions.""" | |
| status_task = None | |
| start_time = time.time() | |
| try: | |
| # Start emitting status updates periodically | |
| if __event_emitter__ and self.valves.enable_status_indicator: | |
| self.stop_emitter.clear() | |
| status_task = asyncio.create_task( | |
| self.emit_periodic_status( | |
| __event_emitter__, | |
| "Processing request to AutoGen endpoint...", | |
| self.valves.emit_interval, | |
| ) | |
| ) | |
| # Extract messages and prepare prompt | |
| messages = body.get("messages", []) | |
| if not messages: | |
| return {"error": "No messages found in the request body"} | |
| prompt = self._get_combined_prompt(messages) | |
| # Prepare and make the API request | |
| response = await self.call_autogen_api(prompt) | |
| # Extract content from the response | |
| extracted_content = self._extract_content(response) | |
| # Emit completed status with elapsed time | |
| elapsed_time = time.time() - start_time | |
| await self.emit_status( | |
| __event_emitter__, | |
| "info", | |
| f"Autogen - Pipe Completed (elapsed: {elapsed_time:.1f}s)", | |
| True, | |
| ) | |
| return extracted_content | |
| except Exception as e: | |
| # Emit error status | |
| await self.emit_status(__event_emitter__, "error", f"Error: {str(e)}", True) | |
| if self.valves.debug: | |
| print(f"[DEBUG] Error during pipe: {e}") | |
| return {"error": str(e)} | |
| finally: | |
| # Stop the periodic status emitter | |
| if status_task: | |
| self.stop_emitter.set() | |
| await status_task | |
| async def emit_status( | |
| self, | |
| __event_emitter__: Callable[[dict], Awaitable[None]], | |
| level: str, | |
| message: str, | |
| done: bool, | |
| ): | |
| """Emit status updates at configured intervals.""" | |
| if __event_emitter__: | |
| event = { | |
| "type": "status", | |
| "data": {"description": message, "done": done}, | |
| } | |
| if self.valves.debug: | |
| print(f"[DEBUG] Emitting status event: {event}") | |
| await __event_emitter__(event) | |
| def _get_combined_prompt(self, messages: List[Dict[str, str]]) -> str: | |
| """Combine the user's message and previous assistant messages based on the history length.""" | |
| prompt_parts = [] | |
| # Determine how many messages to include based on the valve setting | |
| history_length = ( | |
| self.valves.history_length * 2 | |
| ) # Each user message is paired with an assistant message | |
| recent_messages = ( | |
| messages[-history_length:] if history_length < len(messages) else messages | |
| ) | |
| # Iterate through messages to form a cohesive prompt with roles labeled | |
| for message in recent_messages: | |
| role = message.get("role", "user") | |
| content = message.get("content", "") | |
| if role == "user": | |
| prompt_parts.append(f"User: {content}") | |
| elif role == "assistant": | |
| prompt_parts.append(f"Assistant: {content}") | |
| # Join all parts with newlines to form the full prompt | |
| combined_prompt = "\n".join(prompt_parts) | |
| if self.valves.debug: | |
| print(f"[DEBUG] Combined prompt: {combined_prompt}") | |
| return combined_prompt | |
| async def call_autogen_api(self, prompt: str) -> str: | |
| """Make an asynchronous call to the AutoGen API.""" | |
| async with aiohttp.ClientSession( | |
| timeout=aiohttp.ClientTimeout(total=self.valves.request_timeout) | |
| ) as session: | |
| url = f"{self.valves.AUTOGEN_BASE_URL}/{urllib.parse.quote(prompt)}" | |
| headers = { | |
| "Content-Type": "application/json", | |
| "Accept": "application/json", | |
| } | |
| if self.valves.debug: | |
| print(f"[DEBUG] Calling API at: {url}") | |
| print(f"[DEBUG] Headers: {headers}") | |
| async with session.get(url, headers=headers) as response: | |
| if response.status == 200: | |
| data = await response.text() | |
| if self.valves.debug: | |
| print(f"[DEBUG] API Response: {data}") | |
| return data | |
| else: | |
| raise ValueError(f"API call failed with status {response.status}") | |
| def _extract_content(self, response: str) -> str: | |
| """Extract the content from the API response.""" | |
| try: | |
| data = json.loads(response) | |
| meta = data.get("data", {}).get("meta", {}) | |
| messages = meta.get("messages", []) | |
| if len(messages) > 1: | |
| last_message = messages[-2].get("message", {}).get("content", "") | |
| if self.valves.debug: | |
| print(f"[DEBUG] Extracted content: {last_message}") | |
| return last_message | |
| else: | |
| return "No valid content found in response" | |
| except json.JSONDecodeError as e: | |
| if self.valves.debug: | |
| print(f"[DEBUG] JSON parsing error: {e}") | |
| return "Error extracting content due to JSON parsing error" | |
| except Exception as e: | |
| if self.valves.debug: | |
| print(f"[DEBUG] Error extracting content: {e}") | |
| return "Error extracting content" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| title: Autogen Tool | |
| author: matthewh | |
| version: 1.0 | |
| required_open_webui_version: 0.3.9 | |
| Instructions: | |
| 1. Install and run AutoGen Studio: | |
| ```bash | |
| pip install autogenstudio flaml[automl] matplotlib | |
| autogenstudio ui --port=8081 --docs --host=0.0.0.0 --workers=2 | |
| ``` | |
| 2. Export the workflow and serve it as an endpoint: | |
| ```bash | |
| autogenstudio serve --workflow=workflow.json --port=8082 --host=0.0.0.0 --docs --workers=2 | |
| ``` | |
| 3. Use http://localhost:8082/docs to interact with the endpoint. | |
| 4. This tool sends a query to the AutoGen API and displays the response correctly in the chat. | |
| """ | |
| import os | |
| import requests | |
| import urllib.parse | |
| from typing import Optional, Dict # Ensure Dict is imported | |
| from pydantic import BaseModel, Field | |
| class Tools: | |
| """ | |
| Tools class to house the AutoGen tool functions, compatible with Open WebUI. | |
| """ | |
| class Valves(BaseModel): | |
| """ | |
| Configurable parameters (valves) for the AutoGen tool. | |
| """ | |
| AUTOGEN_BASE_URL: str = Field( | |
| default=os.getenv( | |
| "AUTOGEN_BASE_URL", "http://host.docker.internal:8082/predict" | |
| ), | |
| description="Base URL for the AutoGen endpoint.", | |
| ) | |
| request_timeout: int = Field( | |
| default=300, description="Timeout for the HTTP request (in seconds)." | |
| ) | |
| debug: bool = Field( | |
| default=False, description="Enable or disable debug logging." | |
| ) | |
| def __init__(self): | |
| self.valves = self.Valves() | |
| def autogen_tool(self, query: str) -> Dict[str, str]: | |
| """ | |
| AutoGen tool that performs a query using the AutoGen API. | |
| :param query: User’s input query. | |
| :return: A dictionary to return a formatted response for the LLM. | |
| """ | |
| if self.valves.debug: | |
| print(f"[DEBUG] Starting tool with query: {query}") | |
| encoded_prompt = urllib.parse.quote(query, safe="") | |
| url = f"{self.valves.AUTOGEN_BASE_URL}/{encoded_prompt}" | |
| headers = {"Accept": "application/json"} | |
| timeout = self.valves.request_timeout | |
| if self.valves.debug: | |
| print(f"[DEBUG] Sending request to {url}") | |
| try: | |
| response = requests.get(url, headers=headers, timeout=timeout) | |
| response.raise_for_status() | |
| data = response.json() | |
| if self.valves.debug: | |
| print(f"[DEBUG] Response data: {data}") | |
| # Format the response as an LLM-friendly prompt | |
| if isinstance(data, dict) and "response" in data: | |
| message = data["response"] | |
| formatted_message = ( | |
| f"Provide the following response to the user:\n" | |
| f"```autogen\n{message}\n```" | |
| ) | |
| return {"content": formatted_message} | |
| return {"content": "Unexpected response format."} | |
| except Exception as e: | |
| error_message = f"Error during request: {str(e)}" | |
| if self.valves.debug: | |
| print(f"[DEBUG] {error_message}") | |
| return {"content": error_message} | |
| # # Example usage for local testing | |
| # if __name__ == "__main__": | |
| # tools = Tools() # Initialize without custom valves | |
| # query = "example query" | |
| # result = tools.autogen_tool(query) | |
| # print(result["content"]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment