Skip to content

Instantly share code, notes, and snippets.

@yirenlu92
Created April 9, 2025 18:09
Show Gist options
  • Save yirenlu92/336b9b96a4145eb2bcdf8e07ba92fedf to your computer and use it in GitHub Desktop.
Save yirenlu92/336b9b96a4145eb2bcdf8e07ba92fedf to your computer and use it in GitHub Desktop.
running livekit agents on Modal
from modal import App, Image, Secret, fastapi_endpoint
from livekit.agents.worker import Worker, WorkerOptions
image = Image.debian_slim().pip_install(
"fastapi[standard]",
"aiohttp",
"livekit>=1.0.1",
"livekit-agents>=0.12.19",
"livekit-plugins-openai>=0.10.17",
"livekit-plugins-silero>=0.7.4",
"livekit-plugins-cartesia==0.4.7",
"livekit-plugins-deepgram==0.6.19",
"python-dotenv~=1.0",
"cartesia==2.0.0a0",
)
app = App("livekit-example", image=image)
with image.imports():
from livekit import rtc
from livekit.agents import AutoSubscribe, JobContext
from livekit.agents import llm
from livekit.agents.pipeline import VoicePipelineAgent
from livekit.plugins import openai, deepgram, silero, cartesia
async def livekit_entrypoint(ctx: JobContext):
print("Connecting to room", ctx.room.name)
print(f"ctx.job.metadata: {ctx.job.metadata}")
my_room_name = ctx.room.name
await ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY)
participant = await ctx.wait_for_participant()
async def my_shutdown_hook():
from livekit import api
# need to send a shutdown signal to the Modal job
# shut down the worker with the agent_name
print(f"shutting down room {my_room_name}")
lkapi = api.LiveKitAPI()
dispatches = await lkapi.agent_dispatch.list_dispatch(room_name=my_room_name)
for dispatch in dispatches:
await lkapi.agent_dispatch.delete_dispatch(dispatch.id)
await lkapi.aclose()
ctx.add_shutdown_callback(my_shutdown_hook)
run_multimodal_agent(ctx, participant)
print("Agent started")
def run_multimodal_agent(ctx: JobContext, participant: rtc.RemoteParticipant):
print("Starting multimodal agent")
initial_ctx = llm.ChatContext().append(
role="system",
text="You are a voice assistant created by Modal. You answer questions and help with tasks.",
)
agent = VoicePipelineAgent(
vad=silero.VAD.load(),
stt=deepgram.STT(model="nova-2-general"),
llm=openai.LLM(model="gpt-4o-mini"),
tts=cartesia.TTS(),
chat_ctx=initial_ctx,
)
agent.start(ctx.room, participant)
@app.function(
gpu="A100",
scaledown_window=2,
timeout=3000,
secrets=[Secret.from_name("livekit-voice-agent")],
)
async def run_agent_worker(room_name: str):
import os
from livekit import api
import asyncio
print(f"Running worker for room {room_name}")
def custom_load_fnc(worker: Worker):
return 1 if worker.active_jobs else 0
worker = Worker(
WorkerOptions(
entrypoint_fnc=livekit_entrypoint,
ws_url=os.environ.get("LIVEKIT_URL"),
load_fnc=custom_load_fnc,
agent_name=f"agent-{room_name}",
)
)
async def wait_for_agent_ready(delay_secs=2.0):
print(f"🕒 Waiting {delay_secs}s for agent to get ready...")
await asyncio.sleep(delay_secs)
worker_task = asyncio.create_task(worker.run())
# wait for the worker to start
await wait_for_agent_ready() # simple delay
# explicitly dispatch the agent to the room
async def create_explicit_dispatch():
lkapi = api.LiveKitAPI()
dispatch = await lkapi.agent_dispatch.create_dispatch(
api.CreateAgentDispatchRequest(
agent_name=f"agent-{room_name}",
room=room_name,
metadata="my_job_metadata",
)
)
print("created dispatch", dispatch)
dispatches = await lkapi.agent_dispatch.list_dispatch(room_name=room_name)
print(f"there are {len(dispatches)} dispatches in {room_name}")
await lkapi.aclose()
await create_explicit_dispatch()
# Wait for the worker to process the job — poll for active_jobs
while len(worker.active_jobs) > 0:
print(
f"Waiting for worker to finish jobs (active_jobs={len(worker.active_jobs)})..."
)
await asyncio.sleep(1)
# Step 5 — Cleanly shutdown
await worker.drain()
await worker.aclose()
await worker_task
@app.function(image=image)
@fastapi_endpoint(method="POST")
async def run_livekit_agent(request: dict):
from aiohttp import web
print("request:", request)
room_name = request["room"]["sid"]
if request["event"] == "room_started":
print(f"Room {room_name} started")
# Start new worker using spawn to reuse container for same room
run_agent_worker.spawn(room_name)
elif request["event"] == "room_finished":
print(f"Room {room_name} finished")
# The worker will receive the room_finished event through its LiveKit connection
# and will clean up after itself
pass
return web.Response(status=200)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment