Skip to content

Instantly share code, notes, and snippets.

@cnndabbler
Created December 12, 2024 22:33
Show Gist options
  • Save cnndabbler/6ee1b89044306e8073356e59f4cb0bac to your computer and use it in GitHub Desktop.
Save cnndabbler/6ee1b89044306e8073356e59f4cb0bac to your computer and use it in GitHub Desktop.
This script implements a sophisticated movie purchase system using OpenAI's Swarm framework for multi-agent orchestration. The system handles the entire flow from movie selection to shipping using specialized AI agents working in concert.
"""
Movie Purchase System with Multi-Agent Orchestration
This script implements a sophisticated movie purchase system using OpenAI's Swarm framework
for multi-agent orchestration. The system handles the entire flow from movie selection to
shipping using specialized AI agents working in concert.
Key Components:
- Supervisor Agent: Orchestrates the overall purchase flow
- Buy Movie Agent: Handles pricing and purchase transactions
- Ship Movie Agent: Manages shipping logistics
- Sequence Diagram Generation: Visualizes agent interactions for debugging
The system maintains a shared context between agents and provides rich console output
for status updates and transaction monitoring.
Dependencies:
- OpenAI API
- Swarm Framework
- Rich (for console output)
- UUID (for transaction IDs)
- Mermaid (for sequence diagrams)
Author: [Your Name]
Date: December 2024
"""
from openai import OpenAI
from swarm import Swarm, Agent
from swarm.types import Response, Result
from rich import print
from rich.console import Console
from rich.prompt import Prompt
from rich.panel import Panel
from rich.table import Table
from rich import box
import uuid
import datetime
import subprocess
import os
import json
# Initialize Rich console for enhanced output
console = Console()
# Initialize OpenAI client and Swarm crew
client = OpenAI()
crew = Swarm(client=client)
# Model configuration
model="gpt-4o-mini"
# Shared context for all agents
context_variables = {
"query": "", # User's original movie request
"title": None, # Movie title extracted from query
"year": None, # Movie release year
"price": None, # Computed price of the movie
"sales_done": False, # Flag indicating if sale is complete
"shipping_done": False, # Flag indicating if shipping is complete
"transaction_id": None, # Unique identifier for transaction
"timestamp": None, # Transaction timestamp
}
# Store sequence diagram interactions
sequence_interactions = []
def log_interaction(source, target, message, is_activation=False, is_deactivation=False, is_note=False, state_update=None):
"""
Log an interaction between agents for the sequence diagram.
Args:
source (str): The source agent/component initiating the interaction
target (str): The target agent/component receiving the interaction
message (str): The content of the interaction
is_activation (bool, optional): Whether this starts a new activation. Defaults to False.
is_deactivation (bool, optional): Whether this ends an activation. Defaults to False.
is_note (bool, optional): Whether this is a note in the sequence. Defaults to False.
state_update (str, optional): Any state changes to record. Defaults to None.
Returns:
None
"""
sequence_interactions.append({
'source': source,
'target': target,
'message': message,
'is_activation': is_activation,
'is_deactivation': is_deactivation,
'is_note': is_note,
'state_update': state_update,
'timestamp': datetime.datetime.now().isoformat()
})
def log_context_update(source, updates):
"""
Log updates to the context variables.
Args:
source (str): The component making the update
updates (dict): The updates being made to context
Returns:
None
"""
current_state = {
k: v for k, v in context_variables.items()
if v is not None and k != 'query'
}
state_msg = "<br/>".join([f"{k}: {v}" for k, v in current_state.items()])
log_interaction(
source=source,
target="Context",
message="Context Update",
is_note=True,
state_update=f"Current State:<br/>{state_msg}"
)
def get_price(title, year):
"""
Calculate and set the price for a movie.
Args:
title (str): The name of the movie
year (str): The release year of the movie
Returns:
Result: A Result object containing the price and updated context
"""
log_interaction("BuyMovie", "Context", f"Check title={title} & year={year}", is_activation=True)
dvd_fee="$19.99"
console.print(f"The price for {title} first release in {year} is {dvd_fee}")
context_variables["price"] = dvd_fee
log_interaction("BuyMovie", "Context", f"Set price={dvd_fee}")
if "transaction_id" not in context_variables or not context_variables["transaction_id"]:
transaction_info = initialize_transaction()
context_variables.update(transaction_info)
context_variables["sales_done"] = True
log_interaction("BuyMovie", "Context", f"Generate transaction_id={transaction_info['transaction_id']}")
log_interaction("BuyMovie", "Context", "Set sales_done=true")
log_context_update("BuyMovie", transaction_info)
console.print("Buying the movie completed")
console.print(context_variables)
log_interaction("BuyMovie", "Supervisor", "Done buying movie", is_deactivation=True)
return Result(
value="Done buying the movie",
agent=supervisor_agent,
context_variables=context_variables
)
def generate_transaction_id():
"""
Generate a unique transaction ID with DVD prefix.
Returns:
str: A unique transaction ID in the format 'DVD-UUID'
"""
return f"DVD-{str(uuid.uuid4())}"
def initialize_transaction():
"""
Initialize a new transaction with tracking information.
Returns:
dict: Transaction details including ID and timestamp
"""
return {
"transaction_id": generate_transaction_id(),
"timestamp": datetime.datetime.now().isoformat()
}
buy_movie_agent = Agent(
name="buy_movie_agent",
model=model,
instructions="""You are a sales agent. Your ONLY task is to get the price for a movie using the get_price function.
IMPORTANT: You must ALWAYS call get_price with the title and year provided in the context variables.
DO NOT proceed with any other actions.
DO NOT engage in conversation.
DO NOT ask questions.
JUST call get_price immediately.
Example system message you'll receive:
"Get price for movie: Napoleon (1922)"
Your response should be to call:
get_price(title="Napoleon", year="1922")""",
context_variables=context_variables,
functions=[get_price]
)
def process_shipping():
"""
Process the shipping of a purchased movie.
This function checks prerequisites (sale completion) and updates shipping status.
It ensures that shipping only occurs after a successful purchase.
Returns:
Result: A Result object containing shipping status and updated context
"""
log_interaction("ShipMovie", "Context", "Checking prerequisites", is_activation=True)
if not context_variables["sales_done"] or not context_variables["price"]:
log_interaction("ShipMovie", "Context", "Prerequisites check failed - Cannot ship before purchase", is_note=True)
log_interaction("ShipMovie", "Supervisor", "Error: Cannot ship before purchase", is_deactivation=True)
return Result(
value="Error: Cannot ship before purchase is complete",
agent=supervisor_agent,
context_variables=context_variables
)
log_interaction("ShipMovie", "Context", "Prerequisites check passed", is_note=True)
console.print(f"\n[cyan]Shipping movie {context_variables['title']} (ID: {context_variables['transaction_id']}) to {context_variables['shipping_address']}[/cyan]")
context_variables["shipping_done"] = True
log_interaction("ShipMovie", "Context", "Set shipping_done=true")
log_context_update("ShipMovie", {"shipping_done": True})
console.print("[green]Shipping completed [/green]")
log_interaction("ShipMovie", "Supervisor", "Shipping completed", is_deactivation=True)
return Result(
value=f"Successfully shipped movie {context_variables['title']} to {context_variables['shipping_address']}",
agent=supervisor_agent,
context_variables=context_variables
)
def ship_movie():
"""
Initiate the shipping process for a movie.
This function handles the shipping process by:
1. Collecting shipping address
2. Updating context with address
3. Initiating the shipping process
Returns:
Result: A Result object transferring control to the shipping agent
"""
console.print("\n[bold cyan]Shipping Process:[/bold cyan]")
console.print("supervisor_agent ===> shipping_agent \n")
shipping_address = Prompt.ask("[bold cyan]Please enter your shipping address[/bold cyan]")
context_variables["shipping_address"] = shipping_address
console.print(f"[dim]User shipping address: {shipping_address}[/dim]")
console.print("\n[cyan]Current Status:[/cyan]")
display_transaction_status(context_variables)
log_interaction("User", "Supervisor", f"Address: {shipping_address}")
log_interaction("Supervisor", "Context", f"Set shipping_address={shipping_address}")
log_interaction("Supervisor", "ShipMovie", "Process shipping", is_activation=True)
console.print("\n[cyan]Processing shipping request...[/cyan]")
return Result(
value="Transferring to shipping agent",
agent=ship_movie_agent,
context_variables=context_variables
)
def buy_movie(title, year):
"""
Process the purchase of a movie.
Args:
title (str): The title of the movie to purchase
year (str): The release year of the movie
Returns:
Result: A Result object containing purchase status and updated context
"""
log_interaction("Supervisor", "BuyMovie", "Process purchase", is_activation=True)
context_variables["title"] = title
context_variables["year"] = year
log_interaction("Supervisor", "Context", f"Set title={title}, year={year}")
return Result(
value=f"Processing purchase for {title} ({year})",
agent=buy_movie_agent,
context_variables=context_variables
)
def generate_sequence_diagram():
"""
Generate a Mermaid sequence diagram showing the flow of interactions and context evolution.
Returns:
str: The Mermaid diagram definition
"""
# Header with title and styling
diagram = [
"%%{init: {'theme': 'base', 'themeVariables': { 'primaryColor': '#ffffff', 'primaryTextColor': '#000000', 'primaryBorderColor': '#7C0000', 'lineColor': '#000000', 'secondaryColor': '#006100', 'tertiaryColor': '#fff', 'noteTextColor': '#000000', 'noteBorderColor': '#000000', 'noteBkgColor': '#fff5ad' }}}%%",
"sequenceDiagram",
"title Movie Purchase and Shipping Flow",
"participant User",
"participant Supervisor",
"participant BuyAgent",
"participant ShipAgent",
"participant Context"
]
last_state = {}
context_updates = []
for interaction in sequence_interactions:
# Format the message for better readability
message = interaction['message'].replace("=", ": ")
# Map short names to full names
source_map = {"U": "User", "S": "Supervisor", "B": "BuyAgent", "H": "ShipAgent", "C": "Context"}
source = source_map[interaction['source'][0]]
target = source_map[interaction['target'][0]]
# Track context updates
if interaction['state_update']:
new_state = {}
for line in interaction['state_update'].split("<br/>"):
if ": " in line:
key, value = line.split(": ", 1)
new_state[key.strip()] = value.strip()
# Track all meaningful changes
tracked_fields = [
'title', 'year', 'price', 'transaction_id', 'shipping_address',
'payment_status', 'shipping_status'
]
changes = {k: v for k, v in new_state.items()
if k in tracked_fields and (k not in last_state or last_state[k] != v)}
if changes:
context_updates.append({
'source': source,
'target': target,
'changes': changes
})
last_state.update(new_state)
# Add interactions with proper styling
if interaction['is_note']:
if not message.startswith("Context Update") and not message.startswith("State"):
diagram.append(f"Note over {source}: {message}")
elif interaction['is_activation']:
diagram.append(f"{source}->>+{target}: {message}")
# Show context update after activation if exists
for update in context_updates:
if update['source'] == source and update['target'] == target:
change_msg = "<br>".join(f"{k}: {v}" for k, v in update['changes'].items())
diagram.append(f"Note over Context: Context Updated<br>{change_msg}")
elif interaction['is_deactivation']:
diagram.append(f"{source}-->>-{target}: {message}")
else:
diagram.append(f"{source}->>+{target}: {message}")
context_updates = [] # Clear processed updates
# Add final transaction summary with all important context
important_fields = ['title', 'year', 'price', 'transaction_id', 'shipping_address',
'payment_status', 'shipping_status']
final_state = {k: last_state[k] for k in important_fields if k in last_state}
if final_state:
final_summary = "<br>".join(f"{k}: {v}" for k, v in final_state.items())
diagram.append(f"Note over User,Context: Final Transaction State<br>{final_summary}")
return "\n".join(diagram)
def save_diagrams():
"""
Save the sequence diagram as both a Mermaid file and PNG with timestamp.
"""
from datetime import datetime
# Generate timestamp in the format YYYYMMDD_HHMMSS
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
png_filename = f"sequence_diagram_{timestamp}.png"
diagram_content = generate_sequence_diagram()
# Save Mermaid file
with open("sequence_diagram.mmd", "w") as f:
f.write(diagram_content)
# Use mmdc to generate PNG with white background
console.print(f"\n[cyan]Generating sequence diagram PNG with timestamp: {timestamp}...[/cyan]")
subprocess.run([
"mmdc",
"-i", "sequence_diagram.mmd",
"-o", png_filename,
"-b", "white", # Ensure white background
"-t", "default"
])
console.print(f"[green]✓[/green] Sequence diagram saved as '{png_filename}'")
def display_transaction_status(context_variables):
"""
Display current transaction status in a rich formatted table.
Args:
context_variables (dict): The current context variables to display
Returns:
None
"""
table = Table(title="Transaction Status", box=box.ROUNDED)
table.add_column("Field", style="cyan")
table.add_column("Value", style="green")
for key, value in context_variables.items():
if value is not None and key != "query":
table.add_row(key, str(value))
console.print(table)
def run_with_logging(crew, model_override, agent, context_variables, messages, debug=False):
"""
Run the swarm with logging of interactions.
Args:
crew (Swarm): The Swarm instance to use
model_override (str): The model to use for this run
agent (Agent): The initial agent to start with
context_variables (dict): Shared context between agents
messages (list): Messages to process
debug (bool, optional): Whether to enable debug output. Defaults to False.
Returns:
Response: The response from the Swarm run
"""
initial_msg = messages[0]['content']
log_interaction("User", "Supervisor", initial_msg)
log_context_update("System", {})
console.print("\n[cyan]Processing your request...[/cyan]")
response = crew.run(
model_override=model_override,
agent=agent,
context_variables=context_variables,
messages=messages,
debug=debug
)
return response
ship_movie_agent = Agent(
name="ship_movie_agent",
model=model,
instructions="""You are a shipping agent. Your task is to ship the movie to the customer's address.
IMPORTANT:
- Only ship if the movie has been purchased (check sales_done and price in context_variables)
- You must call process_shipping to handle the shipping process
- DO NOT engage in conversation
- DO NOT ask for the address (it's already in context_variables['shipping_address'])
- JUST call process_shipping immediately""",
context_variables=context_variables,
functions=[process_shipping]
)
## Supervisor Agent
supervisor_agent = Agent(
name="supervisor",
model=model,
instructions="""
GOAL:
You help the user buy a movie and ship it to their address.
FOLLOW THIS FLOW STRICTLY:
1. Extract the movie title and year from the query
2. Call buy_movie with the title and year
3. WAIT for the price to be computed (context_variables["price"] must not be None)
4. Only after the price is set, proceed with ship_movie
IMPORTANT:
- DO NOT call ship_movie until the price has been set
- Check context_variables["price"] before proceeding to shipping
- Only make one function call at a time
EXAMPLES:
User: "I want to buy the Matrix released in 1981"
Action: Call buy_movie with title="Matrix", year="1981"
Then: Wait for price to be set before calling ship_movie
""",
context_variables=context_variables,
functions=[buy_movie, ship_movie]
)
def main():
"""
Main function to run the movie purchase system.
This function:
1. Displays welcome message
2. Gets movie purchase request
3. Processes the transaction
4. Generates documentation
"""
console.print(Panel(
"[bold green]Welcome to the Movie Purchase System![/bold green]\n"
"I'll help you purchase and ship your movie today.",
border_style="green"
))
try:
query = Prompt.ask("\n[bold cyan]What movie would you like to buy?[/bold cyan]")
context_variables["query"] = query
agent = supervisor_agent
console.print("\n[bold cyan]Initial Status:[/bold cyan]")
display_transaction_status(context_variables)
response = run_with_logging(
crew,
model_override=model,
agent=agent,
context_variables=context_variables,
messages=[{"role": "user", "content": query}],
debug=False
)
console.print("\n[bold cyan]Final Transaction Status:[/bold cyan]")
display_transaction_status(response.context_variables)
console.print("\n[cyan]Generating sequence diagram...[/cyan]")
save_diagrams()
console.print("\n[bold green] Transaction Complete! Your sequence diagram has been saved.[/bold green]")
except KeyboardInterrupt:
console.print("\n[bold red]Transaction cancelled by user.[/bold red]")
except Exception as e:
console.print(f"\n[bold red]Error: {str(e)}[/bold red]")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment