Skip to content

Instantly share code, notes, and snippets.

@kwindla
Created April 28, 2025 01:36
Show Gist options
  • Save kwindla/df4627935c4a1fa334b33c782483c14a to your computer and use it in GitHub Desktop.
Save kwindla/df4627935c4a1fa334b33c782483c14a to your computer and use it in GitHub Desktop.
Double transcription events test
# double transcription events
# pip install 'pipecat-ai[daily,silero,openai,cartesia]'==0.0.59 dotenv
#
# transcription events as expected
# pip install 'pipecat-ai[daily,silero,openai,cartesia]'==0.0.58 dotenv
import asyncio
import sys
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
load_dotenv(override=True)
logger.remove()
logger.add(sys.stderr, level="DEBUG")
async def main():
# Set up Daily transport with video/audio parameters
transport = DailyTransport(
os.getenv("DAILY_ROOM_URL"),
os.getenv("DAILY_TOKEN"),
"Transcription test",
DailyParams(
audio_out_enabled=True, # Enable output audio for the bot
transcription_enabled=True, # Enable transcription for the user
vad_enabled=True, # Enable VAD to handle user speech
vad_analyzer=SileroVADAnalyzer(), # Use the Silero VAD analyzer
vad_audio_passthrough=True, # Pass audio through VAD for user speech to the rest of the pipeline
),
)
# Initialize text-to-speech service
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="c45bc5ec-dc68-4feb-8829-6e6b2748095d", # Movieman
)
# Initialize LLM service
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
# Set up initial messages for the bot
messages = [
{
"role": "system",
"content": "You are Chatbot, a friendly, helpful robot. Start by introducing yourself.",
},
]
# Set up conversation context and management
# The context_aggregator will automatically collect conversation context
# Pass your initial messages and tools to the context to initialize the context
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
# Add your processors to the pipeline
pipeline = Pipeline(
[
transport.input(),
context_aggregator.user(),
llm,
tts,
transport.output(),
context_aggregator.assistant(),
]
)
# Create a PipelineTask to manage the pipeline
task = PipelineTask(
pipeline,
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
),
# observers=[RTVIObserver(rtvi)],
)
@transport.event_handler("on_participant_joined")
async def on_participant_joined(transport, participant):
logger.debug(f"Participant joined: {participant}")
# Capture the first participant's transcription
await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation by pushing a context frame to the pipeline
await task.queue_frames([context_aggregator.user().get_context_frame()])
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
logger.debug(f"Participant left: {participant}")
# Cancel the PipelineTask to stop processing
await task.cancel()
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
try:
asyncio.run(main())
except Exception as e:
logger.exception(f"Error {e}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment