Skip to content

Instantly share code, notes, and snippets.

@ezynda3
Created December 23, 2024 14:05
Show Gist options
  • Save ezynda3/02f8f36286c8ea96a6e5e6296bba80c2 to your computer and use it in GitHub Desktop.
Save ezynda3/02f8f36286c8ea96a6e5e6296bba80c2 to your computer and use it in GitHub Desktop.
Livekit Real-time audio echo back
package main
import (
"context"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"os"
"os/signal"
"syscall"
"time"
"sync"
msginterfaces "github.com/deepgram/deepgram-go-sdk/pkg/api/listen/v1/websocket/interfaces"
interfaces "github.com/deepgram/deepgram-go-sdk/pkg/client/interfaces"
listenclient "github.com/deepgram/deepgram-go-sdk/pkg/client/listen"
speak "github.com/deepgram/deepgram-go-sdk/pkg/api/speak/v1/rest"
speakclient "github.com/deepgram/deepgram-go-sdk/pkg/client/speak"
"github.com/joho/godotenv"
"github.com/livekit/protocol/auth"
"github.com/livekit/protocol/livekit"
lksdk "github.com/livekit/server-sdk-go/v2"
webrtc "github.com/pion/webrtc/v4"
)
type TranscriptionHandler struct {
room *lksdk.Room
openChan chan *msginterfaces.OpenResponse
messageChan chan *msginterfaces.MessageResponse
metadataChan chan *msginterfaces.MetadataResponse
speechStartedChan chan *msginterfaces.SpeechStartedResponse
utteranceEndChan chan *msginterfaces.UtteranceEndResponse
closeChan chan *msginterfaces.CloseResponse
errorChan chan *msginterfaces.ErrorResponse
unhandledChan chan *[]byte
}
func NewTranscriptionHandler(room *lksdk.Room) *TranscriptionHandler {
handler := &TranscriptionHandler{
room: room,
openChan: make(chan *msginterfaces.OpenResponse),
messageChan: make(chan *msginterfaces.MessageResponse),
metadataChan: make(chan *msginterfaces.MetadataResponse),
speechStartedChan: make(chan *msginterfaces.SpeechStartedResponse),
utteranceEndChan: make(chan *msginterfaces.UtteranceEndResponse),
closeChan: make(chan *msginterfaces.CloseResponse),
errorChan: make(chan *msginterfaces.ErrorResponse),
unhandledChan: make(chan *[]byte),
}
go handler.Run()
return handler
}
func (h TranscriptionHandler) GetOpen() []*chan *msginterfaces.OpenResponse {
return []*chan *msginterfaces.OpenResponse{&h.openChan}
}
func (h TranscriptionHandler) GetMessage() []*chan *msginterfaces.MessageResponse {
return []*chan *msginterfaces.MessageResponse{&h.messageChan}
}
func (h TranscriptionHandler) GetMetadata() []*chan *msginterfaces.MetadataResponse {
return []*chan *msginterfaces.MetadataResponse{&h.metadataChan}
}
func (h TranscriptionHandler) GetSpeechStarted() []*chan *msginterfaces.SpeechStartedResponse {
return []*chan *msginterfaces.SpeechStartedResponse{&h.speechStartedChan}
}
func (h TranscriptionHandler) GetUtteranceEnd() []*chan *msginterfaces.UtteranceEndResponse {
return []*chan *msginterfaces.UtteranceEndResponse{&h.utteranceEndChan}
}
func (h TranscriptionHandler) GetClose() []*chan *msginterfaces.CloseResponse {
return []*chan *msginterfaces.CloseResponse{&h.closeChan}
}
func (h TranscriptionHandler) GetError() []*chan *msginterfaces.ErrorResponse {
return []*chan *msginterfaces.ErrorResponse{&h.errorChan}
}
func (h TranscriptionHandler) GetUnhandled() []*chan *[]byte {
return []*chan *[]byte{&h.unhandledChan}
}
func (h TranscriptionHandler) Run() error {
wg := sync.WaitGroup{}
// Handle connection open
wg.Add(1)
go func() {
defer wg.Done()
for range h.openChan {
log.Printf("πŸ”— Deepgram connection opened")
}
}()
// Handle connection close
wg.Add(1)
go func() {
defer wg.Done()
for range h.closeChan {
log.Printf("❌ Deepgram connection closed")
}
}()
// Handle metadata updates
wg.Add(1)
go func() {
defer wg.Done()
for md := range h.metadataChan {
log.Printf(
"ℹ️ Deepgram metadata: RequestID=%s, Channels=%d, Created=%s",
md.RequestID,
md.Channels,
md.Created,
)
}
}()
// Log when speech starts/ends
wg.Add(1)
go func() {
defer wg.Done()
for range h.speechStartedChan {
log.Printf("πŸ—£οΈ Speech detected")
}
}()
wg.Add(1)
go func() {
defer wg.Done()
for range h.utteranceEndChan {
log.Printf("πŸ”‡ Utterance completed")
}
}()
// Log errors
wg.Add(1)
go func() {
defer wg.Done()
for err := range h.errorChan {
log.Printf("❌ Deepgram error: Type=%s Message=%s Description=%s",
err.ErrCode, err.ErrMsg, err.Description)
}
}()
// Handle unhandled messages
wg.Add(1)
go func() {
defer wg.Done()
for msg := range h.unhandledChan {
log.Printf("⚠️ Unhandled message: %s", string(*msg))
}
}()
// Handle transcription messages
wg.Add(1)
go func() {
defer wg.Done()
for mr := range h.messageChan {
if len(mr.Channel.Alternatives) == 0 {
log.Printf("⚠️ No transcription alternatives received")
continue
}
text := mr.Channel.Alternatives[0].Transcript
if text == "" {
log.Printf("⚠️ Empty transcript received")
continue
}
if mr.IsFinal {
log.Printf("πŸ“ Final transcription: %s", text)
transcription := map[string]interface{}{
"text": text,
"timestamp": time.Now().
UnixNano() /
int64(
time.Millisecond,
),
}
// Handle LLM response
go handleLLMResponse(text, h.room)
jsonBytes, err := json.Marshal(transcription)
if err != nil {
log.Printf("❌ Failed to marshal transcription: %v", err)
continue
}
err = h.room.LocalParticipant.PublishData(
jsonBytes,
lksdk.WithDataPublishTopic("transcription"),
lksdk.WithDataPublishReliable(true))
if err != nil {
log.Printf("❌ Failed to publish transcription: %v", err)
} else {
log.Printf("βœ… Published transcription to room")
}
} else {
log.Printf("πŸ”„ Interim transcription: %s", text)
}
}
}()
wg.Wait()
return nil
}
type TokenResponse struct {
URL string `json:"url"`
Token string `json:"token"`
}
func handleLLMResponse(text string, room *lksdk.Room) {
// Wait 2 seconds before responding
time.Sleep(2 * time.Second)
// TODO: In the future, this will call the actual LLM
response := text
// Create TTS client without initialization
c := speakclient.NewREST(
os.Getenv("DEEPGRAM_API_KEY"),
&interfaces.ClientOptions{},
)
dg := speak.New(c)
// Generate audio file
audioFile := "./response.ogg"
options := &interfaces.SpeakOptions{
Model: "aura-athena-en",
Encoding: "opus",
BitRate: 32000,
}
_, err := dg.ToSave(context.Background(), audioFile, response, options)
if err != nil {
log.Printf("❌ Failed to generate TTS: %v", err)
return
}
// Create a LocalTrack from the audio file
track, err := lksdk.NewLocalFileTrack(audioFile,
lksdk.ReaderTrackWithMime(webrtc.MimeTypeOpus),
lksdk.ReaderTrackWithOnWriteComplete(func() {
log.Printf("βœ… Finished playing TTS response")
// Clean up the temporary audio file
os.Remove(audioFile)
}),
)
if err != nil {
log.Printf("❌ Failed to create local track: %v", err)
return
}
// Publish the track to the room
_, err = room.LocalParticipant.PublishTrack(
track,
&lksdk.TrackPublicationOptions{
Name: "agent-response",
Source: livekit.TrackSource_MICROPHONE,
},
)
if err != nil {
log.Printf("❌ Failed to publish track: %v", err)
return
}
log.Printf("βœ… Published TTS response: %s", response)
}
func main() {
// Load environment variables
if err := godotenv.Load(); err != nil {
log.Fatal("Error loading .env file")
}
url := os.Getenv("LIVEKIT_URL")
apiKey := os.Getenv("LIVEKIT_API_KEY")
apiSecret := os.Getenv("LIVEKIT_API_SECRET")
port := os.Getenv("PORT")
identity := os.Getenv("AGENT_IDENTITY")
if url == "" || apiKey == "" || apiSecret == "" {
log.Fatal(
"LIVEKIT_URL, LIVEKIT_API_KEY and LIVEKIT_API_SECRET are required",
)
}
if port == "" {
port = "8080"
}
if identity == "" {
identity = "demo-agent"
}
// Create a random room name
roomName := fmt.Sprintf("room-%d", time.Now().UnixNano())
// Start HTTP server to provide tokens
http.HandleFunc("/token", func(w http.ResponseWriter, r *http.Request) {
// Create token for playground user
at := auth.NewAccessToken(apiKey, apiSecret)
grant := &auth.VideoGrant{
RoomJoin: true,
Room: roomName,
}
at.AddGrant(grant).
SetIdentity("playground-user").
SetValidFor(24 * time.Hour)
token, err := at.ToJWT()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
resp := TokenResponse{
URL: url,
Token: token,
}
w.Header().Set("Content-Type", "application/json")
w.Header().Set("Access-Control-Allow-Origin", "*")
json.NewEncoder(w).Encode(resp)
})
go func() {
log.Printf("Token endpoint listening on :%s", port)
if err := http.ListenAndServe(":"+port, nil); err != nil {
log.Fatal(err)
}
}()
// Create room client
callback := lksdk.NewRoomCallback()
var room *lksdk.Room // Declare room here so it's accessible in the closure
// Add connection/disconnection callbacks
callback.OnParticipantConnected = func(participant *lksdk.RemoteParticipant) {
log.Printf(
"🟒 Participant connected: %s (identity: %s)",
participant.Name(),
participant.Identity(),
)
}
callback.OnParticipantDisconnected = func(participant *lksdk.RemoteParticipant) {
log.Printf(
"πŸ”΄ Participant disconnected: %s (identity: %s)",
participant.Name(),
participant.Identity(),
)
}
callback.OnIsSpeakingChanged = func(p lksdk.Participant) {
if p.IsSpeaking() {
log.Printf("🎀 Participant %s started speaking", p.Identity())
} else {
log.Printf("πŸ›‘ Participant %s stopped speaking", p.Identity())
}
}
callback.ParticipantCallback.OnTrackSubscribed = func(track *webrtc.TrackRemote, publication *lksdk.RemoteTrackPublication, rp *lksdk.RemoteParticipant) {
// Skip if this is our own audio
if rp.Identity() == identity {
log.Printf("⏭️ Skipping own audio track")
return
}
// Skip if this isn't the playground user
if rp.Identity() != "playground-user" {
log.Printf(
"⏭️ Skipping audio from non-playground user: %s",
rp.Identity(),
)
return
}
if track.Kind() == webrtc.RTPCodecTypeAudio {
log.Printf(
"🎧 Audio track subscribed from playground user: %s",
rp.Identity(),
)
// Get codec parameters
codec := track.Codec()
log.Printf("πŸŽ›οΈ Audio codec: %s, channels: %d, sample rate: %d",
codec.MimeType, codec.Channels, codec.ClockRate)
// Initialize Deepgram client
listenclient.Init(listenclient.InitLib{
LogLevel: listenclient.LogLevelFull, // Increase logging
})
deepgramKey := os.Getenv("DEEPGRAM_API_KEY")
if deepgramKey == "" {
log.Printf("❌ DEEPGRAM_API_KEY not set, skipping transcription")
return
}
// Set transcription options
tOptions := &interfaces.LiveTranscriptionOptions{
Model: "nova-2",
Language: "en-US",
Punctuate: true,
Encoding: "opus",
Channels: 1,
SampleRate: 48000,
SmartFormat: true,
InterimResults: true,
VadEvents: true,
}
// Create callback handler
callback := NewTranscriptionHandler(room)
// Create Deepgram client
cOptions := &interfaces.ClientOptions{
EnableKeepAlive: true,
}
dgClient, err := listenclient.NewWSUsingChan(
context.Background(),
deepgramKey,
cOptions,
tOptions,
callback,
)
if err != nil {
log.Printf("❌ Failed to create Deepgram client: %v", err)
return
}
// Connect to Deepgram
if !dgClient.Connect() {
log.Printf("❌ Failed to connect to Deepgram")
return
}
log.Printf("βœ… Connected to Deepgram successfully")
// Start reading from the track
go func() {
defer dgClient.Stop()
packetCount := 0
lastLog := time.Now()
for {
rtpPacket, _, err := track.ReadRTP()
if err != nil {
if err != io.EOF {
log.Printf("❌ Failed to read RTP packet: %v", err)
}
return
}
packetCount++
if time.Since(lastLog) > time.Second {
log.Printf(
"πŸ“Š Processed %d audio packets in last second",
packetCount,
)
packetCount = 0
lastLog = time.Now()
}
if _, err := dgClient.Write(rtpPacket.Payload); err != nil {
log.Printf("❌ Failed to send data to Deepgram: %v", err)
return
}
}
}()
}
}
room = lksdk.NewRoom(callback)
// Connect to room
log.Printf("Connecting to room %s as %s", roomName, identity)
c := lksdk.ConnectInfo{
APIKey: apiKey,
APISecret: apiSecret,
RoomName: roomName,
ParticipantIdentity: identity,
ParticipantKind: lksdk.ParticipantAgent,
}
if err := room.Join(url, c); err != nil {
log.Fatal("failed to join room:", err)
}
defer room.Disconnect()
log.Printf("Connected to room %s as %s", roomName, identity)
// Wait for interrupt
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT)
<-sigChan
log.Println("Received interrupt signal, shutting down...")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment