Created
October 16, 2025 05:05
-
-
Save BrianMwas/6e1bf6ac323570e07bf307680f68b439 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import logging | |
| from dataclasses import dataclass, field | |
| from typing import Optional | |
| from dotenv import load_dotenv | |
| from livekit import api | |
| from livekit.agents import ( | |
| Agent, | |
| AgentSession, | |
| ChatContext, | |
| JobContext, | |
| JobProcess, | |
| RoomInputOptions, | |
| RoomOutputOptions, | |
| RunContext, | |
| WorkerOptions, | |
| cli, | |
| metrics, | |
| ) | |
| from livekit.agents.job import get_job_context | |
| from livekit.agents.llm import function_tool | |
| from livekit.agents.voice import MetricsCollectedEvent | |
| from livekit.plugins import deepgram, openai, silero | |
| logger = logging.getLogger("student-onboarding") | |
| load_dotenv(dotenv_path=".env.local") | |
| common_instructions = ( | |
| "You are a friendly and professional staff member at EduLearn, " | |
| "a leading video learning platform. You are warm, encouraging, and genuinely " | |
| "interested in helping students achieve their learning goals. " | |
| "CRITICAL: Keep responses SHORT - 2-3 sentences maximum. Be conversational but concise. " | |
| "IMPORTANT: When you use functions to record information, do NOT mention that you're " | |
| "calling functions or say things like 'setting name to Brian' or 'calling set_student_name'. " | |
| "Just have a natural conversation and use the functions silently in the background." | |
| ) | |
| @dataclass | |
| class CourseRecommendation: | |
| title: str | |
| duration_weeks: int | |
| difficulty: str | |
| reason: str | |
| @dataclass | |
| class StudentData: | |
| # Shared data used across all agents | |
| name: Optional[str] = None | |
| current_skill_level: Optional[str] = None # beginner, intermediate, advanced | |
| interests: list[str] = field(default_factory=list) | |
| learning_goals: list[str] = field(default_factory=list) | |
| available_hours_per_week: Optional[int] = None | |
| preferred_learning_style: Optional[str] = None # visual, hands-on, reading, mixed | |
| # Set by Path Designer Agent | |
| recommended_courses: list[CourseRecommendation] = field(default_factory=list) | |
| learning_path: Optional[str] = None | |
| # Set by Enrollment Coordinator | |
| enrolled_courses: list[str] = field(default_factory=list) | |
| start_date: Optional[str] = None | |
| study_schedule: Optional[str] = None | |
| # Track previous agent for context continuity | |
| prev_agent: Optional[Agent] = None | |
| def summarize(self) -> str: | |
| """Summarize student data for context transfer between agents""" | |
| return ( | |
| f"Student: {self.name or 'unknown'}, " | |
| f"Skill: {self.current_skill_level or 'unknown'}, " | |
| f"Interests: {', '.join(self.interests) if self.interests else 'none'}, " | |
| f"Goals: {', '.join(self.learning_goals) if self.learning_goals else 'none'}, " | |
| f"Hours/week: {self.available_hours_per_week or 'unknown'}, " | |
| f"Recommended courses: {len(self.recommended_courses)}, " | |
| f"Enrolled: {', '.join(self.enrolled_courses) if self.enrolled_courses else 'none'}" | |
| ) | |
| class WelcomeAgent(Agent): | |
| def __init__(self) -> None: | |
| super().__init__( | |
| instructions=f"{common_instructions} You are the Welcome Agent. " | |
| "First message: Welcome briefly, then ask for ALL info in one go: name, skill level, " | |
| "topics of interest, learning goal, and hours per week available. Keep it to 3 sentences max. " | |
| "\n\nAfter they respond: " | |
| "1. Use your functions to record their information " | |
| "2. Check if you have: name AND skill_level AND at least one interest AND at least one goal AND hours_per_week " | |
| "3. If you have ALL of these, IMMEDIATELY call handoff_to_path_designer() " | |
| "4. If anything is missing, ask for only those missing items " | |
| "\n\nDo NOT have extra conversation once you have all required info - hand off right away. " | |
| "Keep ALL responses under 3 sentences.", | |
| ) | |
| async def on_enter(self): | |
| self.session.generate_reply() | |
| async def on_exit(self): | |
| """Called when leaving this agent - track for context continuity""" | |
| userdata: StudentData = self.session.userdata | |
| userdata.prev_agent = self | |
| logger.info("WelcomeAgent exiting, data collected: %s", userdata.summarize()) | |
| @function_tool | |
| async def set_student_name( | |
| self, | |
| context: RunContext[StudentData], | |
| name: str, | |
| ): | |
| """Called when the student provides their name. | |
| Args: | |
| name: The student's name | |
| """ | |
| context.userdata.name = name | |
| logger.info("Set student name: %s", name) | |
| @function_tool | |
| async def set_skill_level( | |
| self, | |
| context: RunContext[StudentData], | |
| skill_level: str, | |
| ): | |
| """Called when the student indicates their current skill level. | |
| Args: | |
| skill_level: One of 'beginner', 'intermediate', or 'advanced' | |
| """ | |
| context.userdata.current_skill_level = skill_level | |
| logger.info("Set skill level: %s", skill_level) | |
| @function_tool | |
| async def add_interest( | |
| self, | |
| context: RunContext[StudentData], | |
| interest: str, | |
| ): | |
| """Called when the student mentions a topic they're interested in learning. | |
| Args: | |
| interest: A subject or topic area (e.g., 'web development', 'data science', 'digital marketing') | |
| """ | |
| context.userdata.interests.append(interest) | |
| logger.info("Added interest: %s", interest) | |
| @function_tool | |
| async def add_learning_goal( | |
| self, | |
| context: RunContext[StudentData], | |
| goal: str, | |
| ): | |
| """Called when the student mentions a specific learning goal or outcome they want to achieve. | |
| Args: | |
| goal: A specific goal (e.g., 'get a job as a developer', 'start a freelance business', 'career change') | |
| """ | |
| context.userdata.learning_goals.append(goal) | |
| logger.info("Added learning goal: %s", goal) | |
| @function_tool | |
| async def set_available_hours( | |
| self, | |
| context: RunContext[StudentData], | |
| hours_per_week: int, | |
| ): | |
| """Called when the student indicates how many hours per week they can dedicate to learning. | |
| Args: | |
| hours_per_week: Number of hours available per week | |
| """ | |
| context.userdata.available_hours_per_week = hours_per_week | |
| logger.info("Set available hours: %d per week", hours_per_week) | |
| @function_tool | |
| async def set_learning_style( | |
| self, | |
| context: RunContext[StudentData], | |
| style: str, | |
| ): | |
| """Called when the student mentions their preferred learning style. | |
| Args: | |
| style: One of 'visual', 'hands-on', 'reading', or 'mixed' | |
| """ | |
| context.userdata.preferred_learning_style = style | |
| logger.info("Set learning style: %s", style) | |
| @function_tool | |
| async def handoff_to_path_designer( | |
| self, | |
| context: RunContext[StudentData], | |
| ): | |
| """Called when enough information has been gathered to create a personalized learning path. | |
| You MUST call this function as soon as you have: name, skill level, at least one interest, | |
| at least one goal, and available hours. Do not wait or have extra conversation. | |
| """ | |
| # Validate we have minimum required info | |
| data = context.userdata | |
| logger.info(f"Handoff check - name: {data.name}, skill: {data.current_skill_level}, " | |
| f"interests: {data.interests}, goals: {data.learning_goals}, hours: {data.available_hours_per_week}") | |
| if not all([data.name, data.current_skill_level, data.interests, | |
| data.learning_goals, data.available_hours_per_week]): | |
| missing = [] | |
| if not data.name: missing.append("name") | |
| if not data.current_skill_level: missing.append("skill level") | |
| if not data.interests: missing.append("interests") | |
| if not data.learning_goals: missing.append("learning goals") | |
| if not data.available_hours_per_week: missing.append("available hours") | |
| logger.warning(f"Handoff attempted but missing: {missing}") | |
| return None, f"I still need your {', '.join(missing)} before we can continue." | |
| # Create new agent - TTS will be initialized fresh | |
| path_designer = PathDesignerAgent(chat_ctx=context.session._chat_ctx) | |
| logger.info( | |
| "Handing off to Path Designer Agent with student data: %s", context.userdata | |
| ) | |
| return path_designer, "Perfect! Let me connect you with our Path Designer." | |
| class PathDesignerAgent(Agent): | |
| def __init__(self, chat_ctx: Optional[ChatContext] = None) -> None: | |
| super().__init__( | |
| instructions=f"{common_instructions} You are the Path Designer Agent. " | |
| "IMPORTANT: When you enter, the previous agent just handed off to you with a warm introduction. " | |
| "DON'T repeat their introduction or say 'let me help you' again. " | |
| "Jump straight into recommending courses based on what you know about them. " | |
| "\n\nRecommend 2-3 courses conversationally. Use the recommend_course function for EACH course. " | |
| "Speak naturally about why each course fits. After courses, call set_learning_path. " | |
| "Keep response under 4 sentences total. When ready, hand off to Enrollment Coordinator.", | |
| tts=openai.TTS(voice="echo"), | |
| chat_ctx=chat_ctx, | |
| ) | |
| async def on_enter(self): | |
| """Called when this agent becomes active - ensures context continuity""" | |
| userdata: StudentData = self.session.userdata | |
| chat_ctx = self.chat_ctx.copy() | |
| # Add previous agent's conversation history | |
| if isinstance(userdata.prev_agent, Agent): | |
| prev_ctx = userdata.prev_agent.chat_ctx.copy( | |
| exclude_instructions=True, | |
| exclude_function_call=False | |
| ).truncate(max_items=8) # Get last 8 messages from previous agent | |
| existing_ids = {item.id for item in chat_ctx.items} | |
| new_items = [item for item in prev_ctx.items if item.id not in existing_ids] | |
| chat_ctx.items.extend(new_items) | |
| logger.info(f"PathDesigner: Added {len(new_items)} items from previous agent context") | |
| # Add current student data as system context with clear directive | |
| chat_ctx.add_message( | |
| role="system", | |
| content=f"You are PathDesignerAgent. The WelcomeAgent just introduced you and handed off. " | |
| f"Student data: {userdata.summarize()}. " | |
| f"Jump directly into course recommendations without re-introducing yourself." | |
| ) | |
| await self.update_chat_ctx(chat_ctx) | |
| self.session.generate_reply(tool_choice="auto") # Start generating immediately | |
| async def on_exit(self): | |
| """Called when leaving this agent""" | |
| userdata: StudentData = self.session.userdata | |
| userdata.prev_agent = self | |
| logger.info("PathDesignerAgent exiting, courses recommended: %d", len(userdata.recommended_courses)) | |
| @function_tool | |
| async def recommend_course( | |
| self, | |
| context: RunContext[StudentData], | |
| title: str, | |
| duration_weeks: int, | |
| difficulty: str, | |
| reason: str, | |
| ): | |
| """Called when recommending a specific course to the student. | |
| Args: | |
| title: The course title | |
| duration_weeks: Expected duration in weeks | |
| difficulty: One of 'beginner', 'intermediate', or 'advanced' | |
| reason: Why this course is recommended for this student | |
| """ | |
| try: | |
| recommendation = CourseRecommendation( | |
| title=title, | |
| duration_weeks=duration_weeks, | |
| difficulty=difficulty, | |
| reason=reason | |
| ) | |
| context.userdata.recommended_courses.append(recommendation) | |
| logger.info("Recommended course: %s (%s, %d weeks)", title, difficulty, duration_weeks) | |
| except Exception as e: | |
| logger.error(f"Error recommending course: {e}") | |
| raise | |
| @function_tool | |
| async def set_learning_path( | |
| self, | |
| context: RunContext[StudentData], | |
| path_description: str, | |
| ): | |
| """Called when describing the overall learning path and sequence. | |
| Args: | |
| path_description: A description of the learning path, including course sequence and timeline | |
| """ | |
| try: | |
| context.userdata.learning_path = path_description | |
| logger.info("Set learning path: %s", path_description) | |
| except Exception as e: | |
| logger.error(f"Error setting learning path: {e}") | |
| raise | |
| @function_tool | |
| async def handoff_to_enrollment_coordinator( | |
| self, | |
| context: RunContext[StudentData], | |
| ): | |
| """Called when the student is satisfied with the learning path and ready to enroll. | |
| Should only be called after recommending courses and describing the learning path. | |
| """ | |
| if not context.userdata.recommended_courses: | |
| return None, "Let me recommend some courses first before we move to enrollment." | |
| # Create new agent - TTS will be initialized fresh | |
| enrollment_coordinator = EnrollmentCoordinatorAgent(chat_ctx=context.session._chat_ctx) | |
| logger.info( | |
| "Handing off to Enrollment Coordinator with recommendations: %s", | |
| context.userdata.recommended_courses | |
| ) | |
| return enrollment_coordinator, "Excellent! Let me connect you with our Enrollment Coordinator who will help you get started." | |
| class EnrollmentCoordinatorAgent(Agent): | |
| def __init__(self, chat_ctx: Optional[ChatContext] = None) -> None: | |
| super().__init__( | |
| instructions=f"{common_instructions} You are the Enrollment Coordinator. " | |
| "The Path Designer just handed off to you after recommending courses. " | |
| "DON'T re-introduce yourself or say 'let me help you enroll'. The handoff already said that. " | |
| "Jump straight into asking what they need: start date preference or confirm which courses. " | |
| "\n\nBe direct and efficient. Keep responses under 3 sentences. " | |
| "When done, quick congrats, mention email access, then complete onboarding.", | |
| tts=openai.TTS(voice="shimmer"), | |
| chat_ctx=chat_ctx, | |
| ) | |
| async def on_enter(self): | |
| """Called when this agent becomes active - ensures context continuity""" | |
| userdata: StudentData = self.session.userdata | |
| chat_ctx = self.chat_ctx.copy() | |
| # Add previous agent's conversation history | |
| if isinstance(userdata.prev_agent, Agent): | |
| prev_ctx = userdata.prev_agent.chat_ctx.copy( | |
| exclude_instructions=True, | |
| exclude_function_call=False | |
| ).truncate(max_items=8) | |
| existing_ids = {item.id for item in chat_ctx.items} | |
| new_items = [item for item in prev_ctx.items if item.id not in existing_ids] | |
| chat_ctx.items.extend(new_items) | |
| logger.info(f"EnrollmentCoordinator: Added {len(new_items)} items from previous agent context") | |
| # Add current student data with directive to jump in | |
| chat_ctx.add_message( | |
| role="system", | |
| content=f"You are EnrollmentCoordinatorAgent. PathDesigner just handed off after recommending courses. " | |
| f"Student data: {userdata.summarize()}. " | |
| f"Skip introductions and jump directly into enrollment details." | |
| ) | |
| await self.update_chat_ctx(chat_ctx) | |
| self.session.generate_reply(tool_choice="auto") | |
| @function_tool | |
| async def enroll_in_course( | |
| self, | |
| context: RunContext[StudentData], | |
| course_title: str, | |
| ): | |
| """Called when the student confirms enrollment in a specific course. | |
| Args: | |
| course_title: The title of the course to enroll in | |
| """ | |
| context.userdata.enrolled_courses.append(course_title) | |
| logger.info("Enrolled student in course: %s", course_title) | |
| @function_tool | |
| async def set_start_date( | |
| self, | |
| context: RunContext[StudentData], | |
| start_date: str, | |
| ): | |
| """Called when the student chooses their start date. | |
| Args: | |
| start_date: The date they want to begin (e.g., 'next Monday', 'January 15th') | |
| """ | |
| context.userdata.start_date = start_date | |
| logger.info("Set start date: %s", start_date) | |
| @function_tool | |
| async def create_study_schedule( | |
| self, | |
| context: RunContext[StudentData], | |
| schedule_description: str, | |
| ): | |
| """Called when creating a weekly study schedule for the student. | |
| Args: | |
| schedule_description: Description of the study schedule (days, times, duration) | |
| """ | |
| context.userdata.study_schedule = schedule_description | |
| logger.info("Created study schedule: %s", schedule_description) | |
| @function_tool | |
| async def complete_onboarding(self, context: RunContext[StudentData]): | |
| """Called when enrollment is complete and the student is ready to start learning. | |
| This ends the onboarding session. | |
| """ | |
| self.session.interrupt() | |
| # Generate final congratulations and next steps | |
| await self.session.generate_reply( | |
| instructions=f"Brief congrats in 2 sentences: enrolled in {', '.join(context.userdata.enrolled_courses)}, " | |
| f"starts {context.userdata.start_date}. Say check email for login.", | |
| allow_interruptions=False | |
| ) | |
| job_ctx = get_job_context() | |
| await job_ctx.api.room.delete_room(api.DeleteRoomRequest(room=job_ctx.room.name)) | |
| def prewarm(proc: JobProcess): | |
| proc.userdata["vad"] = silero.VAD.load() | |
| # Pre-warm all TTS voices | |
| proc.userdata["tts_ash"] = openai.TTS(voice="ash") | |
| proc.userdata["tts_echo"] = openai.TTS(voice="echo") | |
| proc.userdata["tts_shimmer"] = openai.TTS(voice="shimmer") | |
| # Pre-warm LLM and STT | |
| proc.userdata["llm"] = openai.LLM(model="gpt-4o-mini") | |
| proc.userdata["stt"] = deepgram.STT(model="nova-2-conversationalai") | |
| async def entrypoint(ctx: JobContext): | |
| await ctx.connect() | |
| session = AgentSession[StudentData]( | |
| vad=ctx.proc.userdata["vad"], | |
| llm=openai.LLM(model="gpt-4o-mini"), | |
| stt=deepgram.STT(model="nova-3"), | |
| tts=openai.TTS(voice="ash"), | |
| userdata=StudentData(), | |
| ) | |
| # Log metrics as they are emitted | |
| usage_collector = metrics.UsageCollector() | |
| @session.on("metrics_collected") | |
| def _on_metrics_collected(ev: MetricsCollectedEvent): | |
| metrics.log_metrics(ev.metrics) | |
| usage_collector.collect(ev.metrics) | |
| async def log_usage(): | |
| summary = usage_collector.get_summary() | |
| logger.info(f"Usage: {summary}") | |
| ctx.add_shutdown_callback(log_usage) | |
| await session.start( | |
| agent=WelcomeAgent(), | |
| room=ctx.room, | |
| room_input_options=RoomInputOptions(), | |
| room_output_options=RoomOutputOptions(transcription_enabled=True), | |
| ) | |
| if __name__ == "__main__": | |
| cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint, prewarm_fnc=prewarm)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment