Created
December 12, 2024 22:33
-
-
Save cnndabbler/6ee1b89044306e8073356e59f4cb0bac to your computer and use it in GitHub Desktop.
This script implements a sophisticated movie purchase system using OpenAI's Swarm framework for multi-agent orchestration. The system handles the entire flow from movie selection to shipping using specialized AI agents working in concert.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Movie Purchase System with Multi-Agent Orchestration | |
This script implements a sophisticated movie purchase system using OpenAI's Swarm framework | |
for multi-agent orchestration. The system handles the entire flow from movie selection to | |
shipping using specialized AI agents working in concert. | |
Key Components: | |
- Supervisor Agent: Orchestrates the overall purchase flow | |
- Buy Movie Agent: Handles pricing and purchase transactions | |
- Ship Movie Agent: Manages shipping logistics | |
- Sequence Diagram Generation: Visualizes agent interactions for debugging | |
The system maintains a shared context between agents and provides rich console output | |
for status updates and transaction monitoring. | |
Dependencies: | |
- OpenAI API | |
- Swarm Framework | |
- Rich (for console output) | |
- UUID (for transaction IDs) | |
- Mermaid (for sequence diagrams) | |
Author: [Your Name] | |
Date: December 2024 | |
""" | |
from openai import OpenAI | |
from swarm import Swarm, Agent | |
from swarm.types import Response, Result | |
from rich import print | |
from rich.console import Console | |
from rich.prompt import Prompt | |
from rich.panel import Panel | |
from rich.table import Table | |
from rich import box | |
import uuid | |
import datetime | |
import subprocess | |
import os | |
import json | |
# Initialize Rich console for enhanced output | |
console = Console() | |
# Initialize OpenAI client and Swarm crew | |
client = OpenAI() | |
crew = Swarm(client=client) | |
# Model configuration | |
model="gpt-4o-mini" | |
# Shared context for all agents | |
context_variables = { | |
"query": "", # User's original movie request | |
"title": None, # Movie title extracted from query | |
"year": None, # Movie release year | |
"price": None, # Computed price of the movie | |
"sales_done": False, # Flag indicating if sale is complete | |
"shipping_done": False, # Flag indicating if shipping is complete | |
"transaction_id": None, # Unique identifier for transaction | |
"timestamp": None, # Transaction timestamp | |
} | |
# Store sequence diagram interactions | |
sequence_interactions = [] | |
def log_interaction(source, target, message, is_activation=False, is_deactivation=False, is_note=False, state_update=None): | |
""" | |
Log an interaction between agents for the sequence diagram. | |
Args: | |
source (str): The source agent/component initiating the interaction | |
target (str): The target agent/component receiving the interaction | |
message (str): The content of the interaction | |
is_activation (bool, optional): Whether this starts a new activation. Defaults to False. | |
is_deactivation (bool, optional): Whether this ends an activation. Defaults to False. | |
is_note (bool, optional): Whether this is a note in the sequence. Defaults to False. | |
state_update (str, optional): Any state changes to record. Defaults to None. | |
Returns: | |
None | |
""" | |
sequence_interactions.append({ | |
'source': source, | |
'target': target, | |
'message': message, | |
'is_activation': is_activation, | |
'is_deactivation': is_deactivation, | |
'is_note': is_note, | |
'state_update': state_update, | |
'timestamp': datetime.datetime.now().isoformat() | |
}) | |
def log_context_update(source, updates): | |
""" | |
Log updates to the context variables. | |
Args: | |
source (str): The component making the update | |
updates (dict): The updates being made to context | |
Returns: | |
None | |
""" | |
current_state = { | |
k: v for k, v in context_variables.items() | |
if v is not None and k != 'query' | |
} | |
state_msg = "<br/>".join([f"{k}: {v}" for k, v in current_state.items()]) | |
log_interaction( | |
source=source, | |
target="Context", | |
message="Context Update", | |
is_note=True, | |
state_update=f"Current State:<br/>{state_msg}" | |
) | |
def get_price(title, year): | |
""" | |
Calculate and set the price for a movie. | |
Args: | |
title (str): The name of the movie | |
year (str): The release year of the movie | |
Returns: | |
Result: A Result object containing the price and updated context | |
""" | |
log_interaction("BuyMovie", "Context", f"Check title={title} & year={year}", is_activation=True) | |
dvd_fee="$19.99" | |
console.print(f"The price for {title} first release in {year} is {dvd_fee}") | |
context_variables["price"] = dvd_fee | |
log_interaction("BuyMovie", "Context", f"Set price={dvd_fee}") | |
if "transaction_id" not in context_variables or not context_variables["transaction_id"]: | |
transaction_info = initialize_transaction() | |
context_variables.update(transaction_info) | |
context_variables["sales_done"] = True | |
log_interaction("BuyMovie", "Context", f"Generate transaction_id={transaction_info['transaction_id']}") | |
log_interaction("BuyMovie", "Context", "Set sales_done=true") | |
log_context_update("BuyMovie", transaction_info) | |
console.print("Buying the movie completed") | |
console.print(context_variables) | |
log_interaction("BuyMovie", "Supervisor", "Done buying movie", is_deactivation=True) | |
return Result( | |
value="Done buying the movie", | |
agent=supervisor_agent, | |
context_variables=context_variables | |
) | |
def generate_transaction_id(): | |
""" | |
Generate a unique transaction ID with DVD prefix. | |
Returns: | |
str: A unique transaction ID in the format 'DVD-UUID' | |
""" | |
return f"DVD-{str(uuid.uuid4())}" | |
def initialize_transaction(): | |
""" | |
Initialize a new transaction with tracking information. | |
Returns: | |
dict: Transaction details including ID and timestamp | |
""" | |
return { | |
"transaction_id": generate_transaction_id(), | |
"timestamp": datetime.datetime.now().isoformat() | |
} | |
buy_movie_agent = Agent( | |
name="buy_movie_agent", | |
model=model, | |
instructions="""You are a sales agent. Your ONLY task is to get the price for a movie using the get_price function. | |
IMPORTANT: You must ALWAYS call get_price with the title and year provided in the context variables. | |
DO NOT proceed with any other actions. | |
DO NOT engage in conversation. | |
DO NOT ask questions. | |
JUST call get_price immediately. | |
Example system message you'll receive: | |
"Get price for movie: Napoleon (1922)" | |
Your response should be to call: | |
get_price(title="Napoleon", year="1922")""", | |
context_variables=context_variables, | |
functions=[get_price] | |
) | |
def process_shipping(): | |
""" | |
Process the shipping of a purchased movie. | |
This function checks prerequisites (sale completion) and updates shipping status. | |
It ensures that shipping only occurs after a successful purchase. | |
Returns: | |
Result: A Result object containing shipping status and updated context | |
""" | |
log_interaction("ShipMovie", "Context", "Checking prerequisites", is_activation=True) | |
if not context_variables["sales_done"] or not context_variables["price"]: | |
log_interaction("ShipMovie", "Context", "Prerequisites check failed - Cannot ship before purchase", is_note=True) | |
log_interaction("ShipMovie", "Supervisor", "Error: Cannot ship before purchase", is_deactivation=True) | |
return Result( | |
value="Error: Cannot ship before purchase is complete", | |
agent=supervisor_agent, | |
context_variables=context_variables | |
) | |
log_interaction("ShipMovie", "Context", "Prerequisites check passed", is_note=True) | |
console.print(f"\n[cyan]Shipping movie {context_variables['title']} (ID: {context_variables['transaction_id']}) to {context_variables['shipping_address']}[/cyan]") | |
context_variables["shipping_done"] = True | |
log_interaction("ShipMovie", "Context", "Set shipping_done=true") | |
log_context_update("ShipMovie", {"shipping_done": True}) | |
console.print("[green]Shipping completed [/green]") | |
log_interaction("ShipMovie", "Supervisor", "Shipping completed", is_deactivation=True) | |
return Result( | |
value=f"Successfully shipped movie {context_variables['title']} to {context_variables['shipping_address']}", | |
agent=supervisor_agent, | |
context_variables=context_variables | |
) | |
def ship_movie(): | |
""" | |
Initiate the shipping process for a movie. | |
This function handles the shipping process by: | |
1. Collecting shipping address | |
2. Updating context with address | |
3. Initiating the shipping process | |
Returns: | |
Result: A Result object transferring control to the shipping agent | |
""" | |
console.print("\n[bold cyan]Shipping Process:[/bold cyan]") | |
console.print("supervisor_agent ===> shipping_agent \n") | |
shipping_address = Prompt.ask("[bold cyan]Please enter your shipping address[/bold cyan]") | |
context_variables["shipping_address"] = shipping_address | |
console.print(f"[dim]User shipping address: {shipping_address}[/dim]") | |
console.print("\n[cyan]Current Status:[/cyan]") | |
display_transaction_status(context_variables) | |
log_interaction("User", "Supervisor", f"Address: {shipping_address}") | |
log_interaction("Supervisor", "Context", f"Set shipping_address={shipping_address}") | |
log_interaction("Supervisor", "ShipMovie", "Process shipping", is_activation=True) | |
console.print("\n[cyan]Processing shipping request...[/cyan]") | |
return Result( | |
value="Transferring to shipping agent", | |
agent=ship_movie_agent, | |
context_variables=context_variables | |
) | |
def buy_movie(title, year): | |
""" | |
Process the purchase of a movie. | |
Args: | |
title (str): The title of the movie to purchase | |
year (str): The release year of the movie | |
Returns: | |
Result: A Result object containing purchase status and updated context | |
""" | |
log_interaction("Supervisor", "BuyMovie", "Process purchase", is_activation=True) | |
context_variables["title"] = title | |
context_variables["year"] = year | |
log_interaction("Supervisor", "Context", f"Set title={title}, year={year}") | |
return Result( | |
value=f"Processing purchase for {title} ({year})", | |
agent=buy_movie_agent, | |
context_variables=context_variables | |
) | |
def generate_sequence_diagram(): | |
""" | |
Generate a Mermaid sequence diagram showing the flow of interactions and context evolution. | |
Returns: | |
str: The Mermaid diagram definition | |
""" | |
# Header with title and styling | |
diagram = [ | |
"%%{init: {'theme': 'base', 'themeVariables': { 'primaryColor': '#ffffff', 'primaryTextColor': '#000000', 'primaryBorderColor': '#7C0000', 'lineColor': '#000000', 'secondaryColor': '#006100', 'tertiaryColor': '#fff', 'noteTextColor': '#000000', 'noteBorderColor': '#000000', 'noteBkgColor': '#fff5ad' }}}%%", | |
"sequenceDiagram", | |
"title Movie Purchase and Shipping Flow", | |
"participant User", | |
"participant Supervisor", | |
"participant BuyAgent", | |
"participant ShipAgent", | |
"participant Context" | |
] | |
last_state = {} | |
context_updates = [] | |
for interaction in sequence_interactions: | |
# Format the message for better readability | |
message = interaction['message'].replace("=", ": ") | |
# Map short names to full names | |
source_map = {"U": "User", "S": "Supervisor", "B": "BuyAgent", "H": "ShipAgent", "C": "Context"} | |
source = source_map[interaction['source'][0]] | |
target = source_map[interaction['target'][0]] | |
# Track context updates | |
if interaction['state_update']: | |
new_state = {} | |
for line in interaction['state_update'].split("<br/>"): | |
if ": " in line: | |
key, value = line.split(": ", 1) | |
new_state[key.strip()] = value.strip() | |
# Track all meaningful changes | |
tracked_fields = [ | |
'title', 'year', 'price', 'transaction_id', 'shipping_address', | |
'payment_status', 'shipping_status' | |
] | |
changes = {k: v for k, v in new_state.items() | |
if k in tracked_fields and (k not in last_state or last_state[k] != v)} | |
if changes: | |
context_updates.append({ | |
'source': source, | |
'target': target, | |
'changes': changes | |
}) | |
last_state.update(new_state) | |
# Add interactions with proper styling | |
if interaction['is_note']: | |
if not message.startswith("Context Update") and not message.startswith("State"): | |
diagram.append(f"Note over {source}: {message}") | |
elif interaction['is_activation']: | |
diagram.append(f"{source}->>+{target}: {message}") | |
# Show context update after activation if exists | |
for update in context_updates: | |
if update['source'] == source and update['target'] == target: | |
change_msg = "<br>".join(f"{k}: {v}" for k, v in update['changes'].items()) | |
diagram.append(f"Note over Context: Context Updated<br>{change_msg}") | |
elif interaction['is_deactivation']: | |
diagram.append(f"{source}-->>-{target}: {message}") | |
else: | |
diagram.append(f"{source}->>+{target}: {message}") | |
context_updates = [] # Clear processed updates | |
# Add final transaction summary with all important context | |
important_fields = ['title', 'year', 'price', 'transaction_id', 'shipping_address', | |
'payment_status', 'shipping_status'] | |
final_state = {k: last_state[k] for k in important_fields if k in last_state} | |
if final_state: | |
final_summary = "<br>".join(f"{k}: {v}" for k, v in final_state.items()) | |
diagram.append(f"Note over User,Context: Final Transaction State<br>{final_summary}") | |
return "\n".join(diagram) | |
def save_diagrams(): | |
""" | |
Save the sequence diagram as both a Mermaid file and PNG with timestamp. | |
""" | |
from datetime import datetime | |
# Generate timestamp in the format YYYYMMDD_HHMMSS | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
png_filename = f"sequence_diagram_{timestamp}.png" | |
diagram_content = generate_sequence_diagram() | |
# Save Mermaid file | |
with open("sequence_diagram.mmd", "w") as f: | |
f.write(diagram_content) | |
# Use mmdc to generate PNG with white background | |
console.print(f"\n[cyan]Generating sequence diagram PNG with timestamp: {timestamp}...[/cyan]") | |
subprocess.run([ | |
"mmdc", | |
"-i", "sequence_diagram.mmd", | |
"-o", png_filename, | |
"-b", "white", # Ensure white background | |
"-t", "default" | |
]) | |
console.print(f"[green]✓[/green] Sequence diagram saved as '{png_filename}'") | |
def display_transaction_status(context_variables): | |
""" | |
Display current transaction status in a rich formatted table. | |
Args: | |
context_variables (dict): The current context variables to display | |
Returns: | |
None | |
""" | |
table = Table(title="Transaction Status", box=box.ROUNDED) | |
table.add_column("Field", style="cyan") | |
table.add_column("Value", style="green") | |
for key, value in context_variables.items(): | |
if value is not None and key != "query": | |
table.add_row(key, str(value)) | |
console.print(table) | |
def run_with_logging(crew, model_override, agent, context_variables, messages, debug=False): | |
""" | |
Run the swarm with logging of interactions. | |
Args: | |
crew (Swarm): The Swarm instance to use | |
model_override (str): The model to use for this run | |
agent (Agent): The initial agent to start with | |
context_variables (dict): Shared context between agents | |
messages (list): Messages to process | |
debug (bool, optional): Whether to enable debug output. Defaults to False. | |
Returns: | |
Response: The response from the Swarm run | |
""" | |
initial_msg = messages[0]['content'] | |
log_interaction("User", "Supervisor", initial_msg) | |
log_context_update("System", {}) | |
console.print("\n[cyan]Processing your request...[/cyan]") | |
response = crew.run( | |
model_override=model_override, | |
agent=agent, | |
context_variables=context_variables, | |
messages=messages, | |
debug=debug | |
) | |
return response | |
ship_movie_agent = Agent( | |
name="ship_movie_agent", | |
model=model, | |
instructions="""You are a shipping agent. Your task is to ship the movie to the customer's address. | |
IMPORTANT: | |
- Only ship if the movie has been purchased (check sales_done and price in context_variables) | |
- You must call process_shipping to handle the shipping process | |
- DO NOT engage in conversation | |
- DO NOT ask for the address (it's already in context_variables['shipping_address']) | |
- JUST call process_shipping immediately""", | |
context_variables=context_variables, | |
functions=[process_shipping] | |
) | |
## Supervisor Agent | |
supervisor_agent = Agent( | |
name="supervisor", | |
model=model, | |
instructions=""" | |
GOAL: | |
You help the user buy a movie and ship it to their address. | |
FOLLOW THIS FLOW STRICTLY: | |
1. Extract the movie title and year from the query | |
2. Call buy_movie with the title and year | |
3. WAIT for the price to be computed (context_variables["price"] must not be None) | |
4. Only after the price is set, proceed with ship_movie | |
IMPORTANT: | |
- DO NOT call ship_movie until the price has been set | |
- Check context_variables["price"] before proceeding to shipping | |
- Only make one function call at a time | |
EXAMPLES: | |
User: "I want to buy the Matrix released in 1981" | |
Action: Call buy_movie with title="Matrix", year="1981" | |
Then: Wait for price to be set before calling ship_movie | |
""", | |
context_variables=context_variables, | |
functions=[buy_movie, ship_movie] | |
) | |
def main(): | |
""" | |
Main function to run the movie purchase system. | |
This function: | |
1. Displays welcome message | |
2. Gets movie purchase request | |
3. Processes the transaction | |
4. Generates documentation | |
""" | |
console.print(Panel( | |
"[bold green]Welcome to the Movie Purchase System![/bold green]\n" | |
"I'll help you purchase and ship your movie today.", | |
border_style="green" | |
)) | |
try: | |
query = Prompt.ask("\n[bold cyan]What movie would you like to buy?[/bold cyan]") | |
context_variables["query"] = query | |
agent = supervisor_agent | |
console.print("\n[bold cyan]Initial Status:[/bold cyan]") | |
display_transaction_status(context_variables) | |
response = run_with_logging( | |
crew, | |
model_override=model, | |
agent=agent, | |
context_variables=context_variables, | |
messages=[{"role": "user", "content": query}], | |
debug=False | |
) | |
console.print("\n[bold cyan]Final Transaction Status:[/bold cyan]") | |
display_transaction_status(response.context_variables) | |
console.print("\n[cyan]Generating sequence diagram...[/cyan]") | |
save_diagrams() | |
console.print("\n[bold green] Transaction Complete! Your sequence diagram has been saved.[/bold green]") | |
except KeyboardInterrupt: | |
console.print("\n[bold red]Transaction cancelled by user.[/bold red]") | |
except Exception as e: | |
console.print(f"\n[bold red]Error: {str(e)}[/bold red]") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment