Skip to content

Instantly share code, notes, and snippets.

@kwindla
Created April 26, 2025 01:52
Show Gist options
  • Save kwindla/5d511428412912fdb3375a3ba973e655 to your computer and use it in GitHub Desktop.
Save kwindla/5d511428412912fdb3375a3ba973e655 to your computer and use it in GitHub Desktop.
OpenAI STT -> LLM -> TTS
#
# Copyright (c) 2024–2025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import argparse
import asyncio
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.services.openai.stt import OpenAISTTService
from pipecat.services.openai.tts import OpenAITTSService
from pipecat.transports.base_transport import TransportParams
from pipecat.transports.network.small_webrtc import SmallWebRTCTransport
from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection
from pipecat.transports.services.daily import DailyParams, DailyTransport
load_dotenv(override=True)
async def run_bot(webrtc_connection: SmallWebRTCConnection, _: argparse.Namespace):
logger.info(f"Starting bot")
# xtransport = SmallWebRTCTransport(
# webrtc_connection=webrtc_connection,
# params=TransportParams(
# audio_in_enabled=True,
# audio_out_enabled=True,
# vad_analyzer=SileroVADAnalyzer(),
# ),
# )
transport = DailyTransport(
os.getenv("DAILY_ROOM_URL"),
os.getenv("DAILY_TOKEN"),
"Respond bot",
params=DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
transcription_enabled=False,
vad_audio_passthrough=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
)
stt = OpenAISTTService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o-transcribe",
prompt="Expect words related to dogs, such as breed names.",
)
tts = OpenAITTSService(api_key=os.getenv("OPENAI_API_KEY"), voice="ballad")
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
messages = [
{
"role": "system",
"content": "You are very knowledgable about dogs. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt, # STT
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
allow_interruptions=True,
audio_out_sample_rate=24000,
enable_metrics=True,
enable_usage_metrics=True,
report_only_initial_ttfb=True,
),
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([context_aggregator.user().get_context_frame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
# @transport.event_handler("on_client_closed")
# async def on_client_closed(transport, client):
# logger.info(f"Client closed connection")
# await task.cancel()
runner = PipelineRunner(handle_sigint=False)
await runner.run(task)
if __name__ == "__main__":
# from run import main
# main()
asyncio.run(run_bot(None, None))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment