Created
April 9, 2025 18:09
-
-
Save yirenlu92/336b9b96a4145eb2bcdf8e07ba92fedf to your computer and use it in GitHub Desktop.
running livekit agents on Modal
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from modal import App, Image, Secret, fastapi_endpoint | |
from livekit.agents.worker import Worker, WorkerOptions | |
image = Image.debian_slim().pip_install( | |
"fastapi[standard]", | |
"aiohttp", | |
"livekit>=1.0.1", | |
"livekit-agents>=0.12.19", | |
"livekit-plugins-openai>=0.10.17", | |
"livekit-plugins-silero>=0.7.4", | |
"livekit-plugins-cartesia==0.4.7", | |
"livekit-plugins-deepgram==0.6.19", | |
"python-dotenv~=1.0", | |
"cartesia==2.0.0a0", | |
) | |
app = App("livekit-example", image=image) | |
with image.imports(): | |
from livekit import rtc | |
from livekit.agents import AutoSubscribe, JobContext | |
from livekit.agents import llm | |
from livekit.agents.pipeline import VoicePipelineAgent | |
from livekit.plugins import openai, deepgram, silero, cartesia | |
async def livekit_entrypoint(ctx: JobContext): | |
print("Connecting to room", ctx.room.name) | |
print(f"ctx.job.metadata: {ctx.job.metadata}") | |
my_room_name = ctx.room.name | |
await ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY) | |
participant = await ctx.wait_for_participant() | |
async def my_shutdown_hook(): | |
from livekit import api | |
# need to send a shutdown signal to the Modal job | |
# shut down the worker with the agent_name | |
print(f"shutting down room {my_room_name}") | |
lkapi = api.LiveKitAPI() | |
dispatches = await lkapi.agent_dispatch.list_dispatch(room_name=my_room_name) | |
for dispatch in dispatches: | |
await lkapi.agent_dispatch.delete_dispatch(dispatch.id) | |
await lkapi.aclose() | |
ctx.add_shutdown_callback(my_shutdown_hook) | |
run_multimodal_agent(ctx, participant) | |
print("Agent started") | |
def run_multimodal_agent(ctx: JobContext, participant: rtc.RemoteParticipant): | |
print("Starting multimodal agent") | |
initial_ctx = llm.ChatContext().append( | |
role="system", | |
text="You are a voice assistant created by Modal. You answer questions and help with tasks.", | |
) | |
agent = VoicePipelineAgent( | |
vad=silero.VAD.load(), | |
stt=deepgram.STT(model="nova-2-general"), | |
llm=openai.LLM(model="gpt-4o-mini"), | |
tts=cartesia.TTS(), | |
chat_ctx=initial_ctx, | |
) | |
agent.start(ctx.room, participant) | |
@app.function( | |
gpu="A100", | |
scaledown_window=2, | |
timeout=3000, | |
secrets=[Secret.from_name("livekit-voice-agent")], | |
) | |
async def run_agent_worker(room_name: str): | |
import os | |
from livekit import api | |
import asyncio | |
print(f"Running worker for room {room_name}") | |
def custom_load_fnc(worker: Worker): | |
return 1 if worker.active_jobs else 0 | |
worker = Worker( | |
WorkerOptions( | |
entrypoint_fnc=livekit_entrypoint, | |
ws_url=os.environ.get("LIVEKIT_URL"), | |
load_fnc=custom_load_fnc, | |
agent_name=f"agent-{room_name}", | |
) | |
) | |
async def wait_for_agent_ready(delay_secs=2.0): | |
print(f"🕒 Waiting {delay_secs}s for agent to get ready...") | |
await asyncio.sleep(delay_secs) | |
worker_task = asyncio.create_task(worker.run()) | |
# wait for the worker to start | |
await wait_for_agent_ready() # simple delay | |
# explicitly dispatch the agent to the room | |
async def create_explicit_dispatch(): | |
lkapi = api.LiveKitAPI() | |
dispatch = await lkapi.agent_dispatch.create_dispatch( | |
api.CreateAgentDispatchRequest( | |
agent_name=f"agent-{room_name}", | |
room=room_name, | |
metadata="my_job_metadata", | |
) | |
) | |
print("created dispatch", dispatch) | |
dispatches = await lkapi.agent_dispatch.list_dispatch(room_name=room_name) | |
print(f"there are {len(dispatches)} dispatches in {room_name}") | |
await lkapi.aclose() | |
await create_explicit_dispatch() | |
# Wait for the worker to process the job — poll for active_jobs | |
while len(worker.active_jobs) > 0: | |
print( | |
f"Waiting for worker to finish jobs (active_jobs={len(worker.active_jobs)})..." | |
) | |
await asyncio.sleep(1) | |
# Step 5 — Cleanly shutdown | |
await worker.drain() | |
await worker.aclose() | |
await worker_task | |
@app.function(image=image) | |
@fastapi_endpoint(method="POST") | |
async def run_livekit_agent(request: dict): | |
from aiohttp import web | |
print("request:", request) | |
room_name = request["room"]["sid"] | |
if request["event"] == "room_started": | |
print(f"Room {room_name} started") | |
# Start new worker using spawn to reuse container for same room | |
run_agent_worker.spawn(room_name) | |
elif request["event"] == "room_finished": | |
print(f"Room {room_name} finished") | |
# The worker will receive the room_finished event through its LiveKit connection | |
# and will clean up after itself | |
pass | |
return web.Response(status=200) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment