Skip to content

Instantly share code, notes, and snippets.

@matthewhand
Last active October 22, 2024 18:43
Show Gist options
  • Save matthewhand/7ed94b3488aa652e4f44b2f1237d3c35 to your computer and use it in GitHub Desktop.
Save matthewhand/7ed94b3488aa652e4f44b2f1237d3c35 to your computer and use it in GitHub Desktop.
"""
title: Autogen Action
author: matthewh
version: 2.0
required_open_webui_version: 0.3.9
Instructions:
Install and run the UI like this:
pip install autogenstudio flaml[automl] matplotlib
autogenstudio ui --port=8081 --docs --host 0.0.0.0 --workers=2
Browse to http://localhost:8081/ and construct your model/agent/workflow.
I am using LM Studio with Qwen-2.5.
Then export the workflow and host it as an endpoint
(**EXPORT! DO NOT DOWNLOAD** — this had me stumped for an hour):
autogenstudio serve --workflow=workflow.json --port=8082 --host 0.0.0.0 --docs --workers=2
Browse to http://localhost:8082/docs, and you can infer multi-agent goodness.
"""
import os
import aiohttp
import asyncio
import urllib.parse
import time
from pydantic import BaseModel, Field
from typing import Optional, Callable, Awaitable, Dict, Any, List
class Action:
class Valves(BaseModel):
AUTOGEN_BASE_URL: str = Field(
default=os.getenv(
"AUTOGEN_BASE_URL", "http://host.docker.internal:8082/predict"
),
description="Base URL for the AutoGen endpoint.",
)
emit_interval: float = Field(
default=1.0, description="Interval in seconds between status emissions."
)
enable_status_indicator: bool = Field(
default=True, description="Enable or disable status indicator emissions."
)
request_timeout: int = Field(
default=300, description="Timeout for the HTTP client session in seconds."
)
debug: bool = Field(
default=False, description="Enable or disable debug logging."
)
overwrite: bool = Field(
default=True,
description="If True, replaces the assistant's message; if False, amends the new response using event emitter.",
)
def __init__(self):
self.valves = self.Valves()
self.stop_emitter = asyncio.Event()
if self.valves.debug:
print(f"Initialized with endpoint: {self.valves.AUTOGEN_BASE_URL}")
async def action(
self,
body: dict,
__user__: Optional[dict] = None,
__event_emitter__: Optional[Callable[[dict], Awaitable[None]]] = None,
) -> Optional[dict]:
if self.valves.debug:
print("Starting action with body:", body)
status_task = None
if __event_emitter__ and self.valves.enable_status_indicator:
self.stop_emitter.clear()
status_task = asyncio.create_task(
self.emit_periodic_status(
__event_emitter__,
"Autogen... Processing request",
self.valves.emit_interval,
)
)
try:
messages = body.get("messages", [])
if not messages:
if self.valves.debug:
print("No messages found in the request body.")
return {"error": "No messages found."}
# Get the most recent user message
prompt = self.get_most_recent_user_message(messages)
if not prompt:
return {"error": "No valid user message found."}
if self.valves.debug:
print(f"Prompt extracted: {prompt}")
encoded_prompt = self.clean_prompt(prompt)
if self.valves.debug:
print(f"Encoded prompt: {encoded_prompt}")
start_time = time.time()
await self.emit_status(
__event_emitter__,
"info",
"Waiting for response from AutoGen endpoint.",
False,
)
response = await self.autogen_process(encoded_prompt, __event_emitter__)
if self.valves.debug:
print(f"Initial response from endpoint: {response}")
if response.startswith("Error:"):
await self.emit_status(__event_emitter__, "error", response, True)
return {"error": response}
if self.valves.overwrite:
# Overwrite mode: Replace the last assistant message, or append if none exists
found = False
for message in reversed(messages):
if message.get("role") == "assistant":
message["content"] = response
found = True
if self.valves.debug:
print(
"Overwritten existing assistant message (overwrite mode)."
)
break
if not found:
# If no existing assistant message, append a new one
messages.append({"role": "assistant", "content": response})
if self.valves.debug:
print("Appended new assistant message (overwrite mode).")
elapsed_time = time.time() - start_time
if self.valves.debug:
print(f"Final response being returned: {body}")
await self.emit_status(
__event_emitter__,
"info",
f"Autogen process completed successfully. (elapsed: {elapsed_time:.1f}s)",
True,
)
# Emit 'message_update' event to notify the UI of the updated messages
if __event_emitter__:
await __event_emitter__(
{
"type": "message_update",
"data": {
"messages": messages,
"updated_message": messages[-1],
},
}
)
# Return the modified body to ensure updated messages are included
return body
else:
# Amend mode: Emit a new message via event emitter and do NOT return the body
if __event_emitter__:
await __event_emitter__(
{
"type": "message", # Using 'message' type to append
"data": {"content": response},
# No 'done' condition needed
}
)
if self.valves.debug:
print("Emitted new message via event emitter (amend mode).")
else:
# Fallback: Append to messages if event emitter is not available
messages.append({"role": "assistant", "content": response})
if self.valves.debug:
print(
"Appended new assistant message via fallback (amend mode)."
)
elapsed_time = time.time() - start_time
if self.valves.debug:
print(f"Final response being returned: {body}")
await self.emit_status(
__event_emitter__,
"info",
f"Autogen process completed successfully. (elapsed: {elapsed_time:.1f}s)",
True,
)
# In amend mode, do not return the body to prevent UI from resetting messages
return None
finally:
if status_task:
self.stop_emitter.set()
await status_task
def get_most_recent_user_message(
self, messages: List[Dict[str, Any]]
) -> Optional[str]:
"""Retrieve the most recent user message from the list of messages."""
for message in reversed(messages):
if message.get("role") == "user" and "content" in message:
return message["content"]
return None
def clean_prompt(self, prompt: str) -> str:
escaped_prompt = urllib.parse.quote(prompt, safe="")
if self.valves.debug:
print(f"Cleaned prompt: {escaped_prompt}")
return escaped_prompt
async def autogen_process(
self,
prompt: str,
__event_emitter__: Optional[Callable[[dict], Awaitable[None]]] = None,
) -> str:
base_url = self.valves.AUTOGEN_BASE_URL
timeout = aiohttp.ClientTimeout(total=self.valves.request_timeout)
url_with_prompt = f"{base_url}/{prompt}"
headers = {"Accept": "application/json"}
if self.valves.debug:
print(
f"Sending request to {url_with_prompt} with timeout {self.valves.request_timeout}"
)
try:
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get(url_with_prompt, headers=headers) as response:
if self.valves.debug:
print(f"Received status code: {response.status}")
response.raise_for_status()
data = await response.json()
# Handle different response structures
if self.valves.debug:
print(f"Received data: {data}")
if isinstance(data, dict):
if "response" in data:
return data["response"]
elif "message" in data and "content" in data["message"]:
return data["message"]["content"]
elif (
"meta" in data.get("data", {})
and "messages" in data["data"]["meta"]
):
# Get the second to last message content instead of the last
messages = data["data"]["meta"]["messages"]
for message in reversed(messages[:-1]):
if (
isinstance(message, dict)
and "message" in message
and "content" in message["message"]
):
return message["message"]["content"]
return "No valid response content found."
else:
return "Unexpected response format."
except Exception as e:
error_message = f"Error during request: {str(e)}"
if self.valves.debug:
print(error_message)
await self.emit_status(__event_emitter__, "error", error_message, True)
return f"Error: {str(e)}"
async def emit_periodic_status(
self,
__event_emitter__: Callable[[dict], Awaitable[None]],
message: str,
interval: float,
):
start_time = time.time()
try:
while not self.stop_emitter.is_set():
elapsed_time = time.time() - start_time
await self.emit_status(
__event_emitter__,
"info",
f"{message} (elapsed: {elapsed_time:.1f}s)",
False,
)
await asyncio.sleep(interval)
except asyncio.CancelledError:
if self.valves.debug:
print("Periodic status emitter cancelled.")
async def emit_status(
self,
__event_emitter__: Callable[[dict], Awaitable[None]],
level: str,
message: str,
done: bool,
):
if __event_emitter__:
event = {
"type": "status",
"data": {
"status": "complete" if done else "in_progress",
"level": level,
"description": message,
"done": done,
},
}
if self.valves.debug:
print(f"Emitting status event: {event}")
await __event_emitter__(event)
async def on_start(self):
if self.valves.debug:
print("Autogen Action started")
async def on_stop(self):
if self.valves.debug:
print("Autogen Action stopped")
"""
title: Autogen Pipeline
author: matthewh
version: 1.4
required_open_webui_version: 0.3.9
Instructions:
1. Install and run the UI:
pip install autogenstudio flaml[automl] matplotlib
2. Start the UI:
autogenstudio ui --port=8081 --docs --host=0.0.0.0 --workers=2
3. Construct your model/agent/workflow at:
http://localhost:8081/
4. Export the workflow as an endpoint:
autogenstudio serve --workflow=workflow.json --port=8082 --host=0.0.0.0 --docs --workers=2
5. Use the endpoint at:
http://localhost:8082/docs
"""
from typing import Optional, Callable, Awaitable, Dict, Any, List, Union
import aiohttp
import json
import urllib.parse
import time
import asyncio
from pydantic import BaseModel, Field
class Pipe:
"""Pipeline for managing interactions with the AutoGen endpoint."""
class Valves(BaseModel):
AUTOGEN_BASE_URL: str = Field(
default="http://host.docker.internal:8082/predict",
description="Base URL for the AutoGen endpoint.",
)
emit_interval: float = Field(
default=1.0, description="Interval in seconds between status emissions."
)
enable_status_indicator: bool = Field(
default=True, description="Enable or disable status indicator emissions."
)
request_timeout: int = Field(
default=300, description="Timeout for the HTTP client session in seconds."
)
debug: bool = Field(
default=False, description="Enable or disable debug logging."
)
history_length: int = Field(
default=1,
description="Number of messages to include from the chat history, starting from the most recent. Default is 1, which includes the user's message and the previous assistant message if available.",
)
def __init__(self):
self.valves = self.Valves()
self.stop_emitter = asyncio.Event()
async def emit_periodic_status(
self,
__event_emitter__: Callable[[dict], Awaitable[None]],
message: str,
interval: float,
):
"""Emit status updates periodically until the stop event is set."""
start_time = time.time()
try:
while not self.stop_emitter.is_set():
elapsed_time = time.time() - start_time
await self.emit_status(
__event_emitter__,
"info",
f"{message} (elapsed: {elapsed_time:.1f}s)",
False,
)
await asyncio.sleep(interval)
except asyncio.CancelledError:
if self.valves.debug:
print("[DEBUG] Periodic status emitter cancelled.")
async def pipe(
self,
body: dict,
__user__: Optional[dict] = None,
__event_emitter__: Callable[[dict], Awaitable[None]] = None,
) -> Optional[Dict[str, Any]]:
"""Main handler for the AutoGen API interactions."""
status_task = None
start_time = time.time()
try:
# Start emitting status updates periodically
if __event_emitter__ and self.valves.enable_status_indicator:
self.stop_emitter.clear()
status_task = asyncio.create_task(
self.emit_periodic_status(
__event_emitter__,
"Processing request to AutoGen endpoint...",
self.valves.emit_interval,
)
)
# Extract messages and prepare prompt
messages = body.get("messages", [])
if not messages:
return {"error": "No messages found in the request body"}
prompt = self._get_combined_prompt(messages)
# Prepare and make the API request
response = await self.call_autogen_api(prompt)
# Extract content from the response
extracted_content = self._extract_content(response)
# Emit completed status with elapsed time
elapsed_time = time.time() - start_time
await self.emit_status(
__event_emitter__,
"info",
f"Autogen - Pipe Completed (elapsed: {elapsed_time:.1f}s)",
True,
)
return extracted_content
except Exception as e:
# Emit error status
await self.emit_status(__event_emitter__, "error", f"Error: {str(e)}", True)
if self.valves.debug:
print(f"[DEBUG] Error during pipe: {e}")
return {"error": str(e)}
finally:
# Stop the periodic status emitter
if status_task:
self.stop_emitter.set()
await status_task
async def emit_status(
self,
__event_emitter__: Callable[[dict], Awaitable[None]],
level: str,
message: str,
done: bool,
):
"""Emit status updates at configured intervals."""
if __event_emitter__:
event = {
"type": "status",
"data": {"description": message, "done": done},
}
if self.valves.debug:
print(f"[DEBUG] Emitting status event: {event}")
await __event_emitter__(event)
def _get_combined_prompt(self, messages: List[Dict[str, str]]) -> str:
"""Combine the user's message and previous assistant messages based on the history length."""
prompt_parts = []
# Determine how many messages to include based on the valve setting
history_length = (
self.valves.history_length * 2
) # Each user message is paired with an assistant message
recent_messages = (
messages[-history_length:] if history_length < len(messages) else messages
)
# Iterate through messages to form a cohesive prompt with roles labeled
for message in recent_messages:
role = message.get("role", "user")
content = message.get("content", "")
if role == "user":
prompt_parts.append(f"User: {content}")
elif role == "assistant":
prompt_parts.append(f"Assistant: {content}")
# Join all parts with newlines to form the full prompt
combined_prompt = "\n".join(prompt_parts)
if self.valves.debug:
print(f"[DEBUG] Combined prompt: {combined_prompt}")
return combined_prompt
async def call_autogen_api(self, prompt: str) -> str:
"""Make an asynchronous call to the AutoGen API."""
async with aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=self.valves.request_timeout)
) as session:
url = f"{self.valves.AUTOGEN_BASE_URL}/{urllib.parse.quote(prompt)}"
headers = {
"Content-Type": "application/json",
"Accept": "application/json",
}
if self.valves.debug:
print(f"[DEBUG] Calling API at: {url}")
print(f"[DEBUG] Headers: {headers}")
async with session.get(url, headers=headers) as response:
if response.status == 200:
data = await response.text()
if self.valves.debug:
print(f"[DEBUG] API Response: {data}")
return data
else:
raise ValueError(f"API call failed with status {response.status}")
def _extract_content(self, response: str) -> str:
"""Extract the content from the API response."""
try:
data = json.loads(response)
meta = data.get("data", {}).get("meta", {})
messages = meta.get("messages", [])
if len(messages) > 1:
last_message = messages[-2].get("message", {}).get("content", "")
if self.valves.debug:
print(f"[DEBUG] Extracted content: {last_message}")
return last_message
else:
return "No valid content found in response"
except json.JSONDecodeError as e:
if self.valves.debug:
print(f"[DEBUG] JSON parsing error: {e}")
return "Error extracting content due to JSON parsing error"
except Exception as e:
if self.valves.debug:
print(f"[DEBUG] Error extracting content: {e}")
return "Error extracting content"
"""
title: Autogen Tool
author: matthewh
version: 1.0
required_open_webui_version: 0.3.9
Instructions:
1. Install and run AutoGen Studio:
```bash
pip install autogenstudio flaml[automl] matplotlib
autogenstudio ui --port=8081 --docs --host=0.0.0.0 --workers=2
```
2. Export the workflow and serve it as an endpoint:
```bash
autogenstudio serve --workflow=workflow.json --port=8082 --host=0.0.0.0 --docs --workers=2
```
3. Use http://localhost:8082/docs to interact with the endpoint.
4. This tool sends a query to the AutoGen API and displays the response correctly in the chat.
"""
import os
import requests
import urllib.parse
from typing import Optional, Dict # Ensure Dict is imported
from pydantic import BaseModel, Field
class Tools:
"""
Tools class to house the AutoGen tool functions, compatible with Open WebUI.
"""
class Valves(BaseModel):
"""
Configurable parameters (valves) for the AutoGen tool.
"""
AUTOGEN_BASE_URL: str = Field(
default=os.getenv(
"AUTOGEN_BASE_URL", "http://host.docker.internal:8082/predict"
),
description="Base URL for the AutoGen endpoint.",
)
request_timeout: int = Field(
default=300, description="Timeout for the HTTP request (in seconds)."
)
debug: bool = Field(
default=False, description="Enable or disable debug logging."
)
def __init__(self):
self.valves = self.Valves()
def autogen_tool(self, query: str) -> Dict[str, str]:
"""
AutoGen tool that performs a query using the AutoGen API.
:param query: User’s input query.
:return: A dictionary to return a formatted response for the LLM.
"""
if self.valves.debug:
print(f"[DEBUG] Starting tool with query: {query}")
encoded_prompt = urllib.parse.quote(query, safe="")
url = f"{self.valves.AUTOGEN_BASE_URL}/{encoded_prompt}"
headers = {"Accept": "application/json"}
timeout = self.valves.request_timeout
if self.valves.debug:
print(f"[DEBUG] Sending request to {url}")
try:
response = requests.get(url, headers=headers, timeout=timeout)
response.raise_for_status()
data = response.json()
if self.valves.debug:
print(f"[DEBUG] Response data: {data}")
# Format the response as an LLM-friendly prompt
if isinstance(data, dict) and "response" in data:
message = data["response"]
formatted_message = (
f"Provide the following response to the user:\n"
f"```autogen\n{message}\n```"
)
return {"content": formatted_message}
return {"content": "Unexpected response format."}
except Exception as e:
error_message = f"Error during request: {str(e)}"
if self.valves.debug:
print(f"[DEBUG] {error_message}")
return {"content": error_message}
# # Example usage for local testing
# if __name__ == "__main__":
# tools = Tools() # Initialize without custom valves
# query = "example query"
# result = tools.autogen_tool(query)
# print(result["content"])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment