Skip to content

Instantly share code, notes, and snippets.

@kwindla
Last active February 11, 2025 18:47
Show Gist options
  • Save kwindla/9184dc0f78a8c6efe8918c53f261975a to your computer and use it in GitHub Desktop.
Save kwindla/9184dc0f78a8c6efe8918c53f261975a to your computer and use it in GitHub Desktop.
Pipeline with lookup example
async def start_query_knowledge_base(function_name, llm, context):
"""Push a frame to the TTS service; this is handy when the LLM response might take a while."""
await llm.push_frame(TTSSpeakFrame("Please hold on while I look that order up for you."))
async def query_knowledge_base(function_name, tool_call_id, args, llm, context, result_callback):
query_result = knowledge_base.get(args["query"])
await result_callback({
"info": json.dumps({
"lookup_success": True,
"order_status": query_result["order_status"],
"delivery_date": query_result["delivery_date"],
})
})
llm.register_function("query_knowledge_base",
query_knowledge_base
start_callback=start_query_knowledge_base
)
pipeline = VoiceAgentPipeline(
[
transport.input(), # Audio in from user
stt, # Speech to text
context_aggregator.user(), # Manage 'user' context
llm, # LLM inference
tts, # Text to speech
transport.output(), # Audio out to Network
context_aggregator.assistant(), # Manage 'assistant' context
]
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment