Last active
March 13, 2025 13:03
-
-
Save navjotdhanawat/e94c77cddfb5d35b8acb2a62995f087b to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import json | |
import logging | |
from livekit import rtc, api | |
from livekit.agents import llm, JobContext | |
from typing import Annotated, Optional | |
from datetime import datetime | |
import aiohttp | |
logger = logging.getLogger("call-actions") | |
def create_dynamic_function(schema: dict): | |
"""Creates a dynamic async function based on JSON schema""" | |
webhook_url = schema.get("webhook_url") | |
properties = schema.get("properties", {}) | |
# Create function parameters string - without Annotated | |
params = [] | |
for prop_name, prop_schema in properties.items(): | |
params.append(f"{prop_name}: str") # Simplified parameter type | |
# Create function definition | |
func_def = f""" | |
async def dynamic_function(self, {", ".join(params)}): | |
\"\"\"{schema.get("description", "Dynamically created function")}\"\"\" | |
if webhook_url: | |
async with aiohttp.ClientSession() as session: | |
headers = {{"Content-Type": "application/json"}} | |
data = {{{", ".join(f'"{k}": {k}' for k in properties.keys())}}} | |
try: | |
async with session.post(webhook_url, json=data, headers=headers) as response: | |
if response.status >= 400: | |
logger.error(f"Webhook call failed: {{response.status}} , {{webhook_url}}") | |
return {{"error": f"Webhook call failed with status {{response.status}}"}} | |
return await response.json() | |
except Exception as e: | |
logger.error(f"Error calling webhook: {{e}}") | |
return {{"error": str(e)}} | |
return data | |
""" | |
# Create function namespace | |
namespace = {"aiohttp": aiohttp, "logger": logger, "webhook_url": webhook_url} | |
# Execute function definition | |
exec(func_def, namespace) | |
dynamic_function = namespace["dynamic_function"] | |
# namespace.clear() # Clear references | |
# Set function metadata | |
dynamic_function.__name__ = schema.get("name", "dynamic_function") | |
dynamic_function.__doc__ = schema.get("description", "Dynamically created function") | |
# Add type hints after function creation | |
annotations = {} | |
for prop_name, prop_schema in properties.items(): | |
annotations[prop_name] = Annotated[str, prop_schema.get("description", "")] | |
dynamic_function.__annotations__ = annotations | |
return dynamic_function | |
class CallActions(llm.FunctionContext): | |
""" | |
AI function calls for handling outbound calls. | |
""" | |
def __init__( | |
self, | |
*, | |
api: api.LiveKitAPI, | |
participant: rtc.RemoteParticipant, | |
room: rtc.Room, | |
ctx: JobContext, | |
): | |
super().__init__() | |
self.api = api | |
self.participant = participant | |
self.room = room | |
self.ctx = ctx | |
async def hangup(self): | |
"""Ends the call.""" | |
try: | |
await self.api.room.remove_participant( | |
api.RoomParticipantIdentity( | |
room=self.room.name, identity=self.participant.identity | |
) | |
) | |
await self.api.room.delete_room(api.DeleteRoomRequest(room=self.room.name)) | |
self.ctx.shutdown() | |
except Exception as e: | |
logger.info(f"Error while ending call: {e}") | |
async def end_call(self): | |
"""Called when the user wants to end the call.""" | |
logger.info(f"Ending call for {self.participant.identity}") | |
await self.hangup() | |
async def look_up_availability(self, date: Annotated[str, "Appointment date"]): | |
"""Checks alternative appointment availability.""" | |
logger.info(f"Checking availability for {self.participant.identity} on {date}") | |
await asyncio.sleep(2) | |
return json.dumps({"available_times": ["10am", "2pm", "4pm"]}) | |
async def detected_answering_machine( | |
self, reason: Annotated[str, "Reason of detecting answering machine"] | |
): | |
"""Ends call if it reaches voicemail.""" | |
logger.info( | |
f"Detected answering machine for {self.participant.identity}, {reason}" | |
) | |
await self.hangup() | |
async def get_time(self): | |
"""Called to retrieve the current local time""" | |
return datetime.now().strftime("%H:%M:%S") | |
def create_call_actions_class( | |
enabled_functions: list, dynamic_schemas: Optional[dict] = None | |
): | |
""" | |
Creates a subclass of CallActions with enabled functions and dynamic functions. | |
Args: | |
enabled_functions: List of predefined function names to enable | |
dynamic_schemas: Dict of {function_name: schema_dict} for dynamic functions | |
""" | |
class DynamicCallActions(CallActions): | |
pass | |
# Enable predefined functions | |
for func_name in enabled_functions: | |
if hasattr(DynamicCallActions, func_name): | |
func = getattr(DynamicCallActions, func_name) | |
decorated_func = llm.ai_callable()(func) | |
setattr(DynamicCallActions, func_name, decorated_func) | |
# Create and add dynamic functions | |
if dynamic_schemas: | |
for func_name, schema in dynamic_schemas.items(): | |
dynamic_func = create_dynamic_function(schema) | |
decorated_func = llm.ai_callable()(dynamic_func) | |
setattr(DynamicCallActions, func_name, decorated_func) | |
return DynamicCallActions | |
# below is the usage of this util | |
# these are predefined functions but we can enable whichever we want as per agent | |
enabled_functions = ["end_call"] | |
# these are custom functions | |
functions = ( | |
[ | |
{ | |
"name": "check_appointment", | |
"description": "Check appointment availability for a given date", | |
"type": "object", | |
"properties": { | |
"appointment_date": { | |
"type": "string", | |
"description": "The date to check availability for (YYYY-MM-DD)", | |
}, | |
"doctor_id": { | |
"type": "string", | |
"description": "Optional doctor ID to check availability for", | |
}, | |
}, | |
"required": ["appointment_date"], | |
"webhook_url": "http://localhost:3333/appointment", | |
}, | |
{ | |
"name": "book_appointment", | |
"description": "Book an appointment slot", | |
"type": "object", | |
"properties": { | |
"slot_id": { | |
"type": "string", | |
"description": "The ID of the appointment slot to book", | |
}, | |
"patient_name": { | |
"type": "string", | |
"description": "Name of the patient", | |
}, | |
}, | |
"required": ["slot_id", "patient_name"], | |
"webhook_url": "http://localhost:3333/book-appointment", | |
}, | |
], | |
) | |
# Create call actions with both predefined and dynamic functions | |
DynamicCallActionsCls = create_call_actions_class( | |
enabled_functions=enabled_functions, dynamic_schemas=dynamic_schemas | |
) | |
call_actions = DynamicCallActionsCls( | |
api=ctx.api, participant=participant, room=ctx.room, ctx=JobContext | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment