Created
December 23, 2024 14:05
-
-
Save ezynda3/02f8f36286c8ea96a6e5e6296bba80c2 to your computer and use it in GitHub Desktop.
Livekit Real-time audio echo back
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"encoding/json" | |
"fmt" | |
"io" | |
"log" | |
"net/http" | |
"os" | |
"os/signal" | |
"syscall" | |
"time" | |
"sync" | |
msginterfaces "github.com/deepgram/deepgram-go-sdk/pkg/api/listen/v1/websocket/interfaces" | |
interfaces "github.com/deepgram/deepgram-go-sdk/pkg/client/interfaces" | |
listenclient "github.com/deepgram/deepgram-go-sdk/pkg/client/listen" | |
speak "github.com/deepgram/deepgram-go-sdk/pkg/api/speak/v1/rest" | |
speakclient "github.com/deepgram/deepgram-go-sdk/pkg/client/speak" | |
"github.com/joho/godotenv" | |
"github.com/livekit/protocol/auth" | |
"github.com/livekit/protocol/livekit" | |
lksdk "github.com/livekit/server-sdk-go/v2" | |
webrtc "github.com/pion/webrtc/v4" | |
) | |
type TranscriptionHandler struct { | |
room *lksdk.Room | |
openChan chan *msginterfaces.OpenResponse | |
messageChan chan *msginterfaces.MessageResponse | |
metadataChan chan *msginterfaces.MetadataResponse | |
speechStartedChan chan *msginterfaces.SpeechStartedResponse | |
utteranceEndChan chan *msginterfaces.UtteranceEndResponse | |
closeChan chan *msginterfaces.CloseResponse | |
errorChan chan *msginterfaces.ErrorResponse | |
unhandledChan chan *[]byte | |
} | |
func NewTranscriptionHandler(room *lksdk.Room) *TranscriptionHandler { | |
handler := &TranscriptionHandler{ | |
room: room, | |
openChan: make(chan *msginterfaces.OpenResponse), | |
messageChan: make(chan *msginterfaces.MessageResponse), | |
metadataChan: make(chan *msginterfaces.MetadataResponse), | |
speechStartedChan: make(chan *msginterfaces.SpeechStartedResponse), | |
utteranceEndChan: make(chan *msginterfaces.UtteranceEndResponse), | |
closeChan: make(chan *msginterfaces.CloseResponse), | |
errorChan: make(chan *msginterfaces.ErrorResponse), | |
unhandledChan: make(chan *[]byte), | |
} | |
go handler.Run() | |
return handler | |
} | |
func (h TranscriptionHandler) GetOpen() []*chan *msginterfaces.OpenResponse { | |
return []*chan *msginterfaces.OpenResponse{&h.openChan} | |
} | |
func (h TranscriptionHandler) GetMessage() []*chan *msginterfaces.MessageResponse { | |
return []*chan *msginterfaces.MessageResponse{&h.messageChan} | |
} | |
func (h TranscriptionHandler) GetMetadata() []*chan *msginterfaces.MetadataResponse { | |
return []*chan *msginterfaces.MetadataResponse{&h.metadataChan} | |
} | |
func (h TranscriptionHandler) GetSpeechStarted() []*chan *msginterfaces.SpeechStartedResponse { | |
return []*chan *msginterfaces.SpeechStartedResponse{&h.speechStartedChan} | |
} | |
func (h TranscriptionHandler) GetUtteranceEnd() []*chan *msginterfaces.UtteranceEndResponse { | |
return []*chan *msginterfaces.UtteranceEndResponse{&h.utteranceEndChan} | |
} | |
func (h TranscriptionHandler) GetClose() []*chan *msginterfaces.CloseResponse { | |
return []*chan *msginterfaces.CloseResponse{&h.closeChan} | |
} | |
func (h TranscriptionHandler) GetError() []*chan *msginterfaces.ErrorResponse { | |
return []*chan *msginterfaces.ErrorResponse{&h.errorChan} | |
} | |
func (h TranscriptionHandler) GetUnhandled() []*chan *[]byte { | |
return []*chan *[]byte{&h.unhandledChan} | |
} | |
func (h TranscriptionHandler) Run() error { | |
wg := sync.WaitGroup{} | |
// Handle connection open | |
wg.Add(1) | |
go func() { | |
defer wg.Done() | |
for range h.openChan { | |
log.Printf("π Deepgram connection opened") | |
} | |
}() | |
// Handle connection close | |
wg.Add(1) | |
go func() { | |
defer wg.Done() | |
for range h.closeChan { | |
log.Printf("β Deepgram connection closed") | |
} | |
}() | |
// Handle metadata updates | |
wg.Add(1) | |
go func() { | |
defer wg.Done() | |
for md := range h.metadataChan { | |
log.Printf( | |
"βΉοΈ Deepgram metadata: RequestID=%s, Channels=%d, Created=%s", | |
md.RequestID, | |
md.Channels, | |
md.Created, | |
) | |
} | |
}() | |
// Log when speech starts/ends | |
wg.Add(1) | |
go func() { | |
defer wg.Done() | |
for range h.speechStartedChan { | |
log.Printf("π£οΈ Speech detected") | |
} | |
}() | |
wg.Add(1) | |
go func() { | |
defer wg.Done() | |
for range h.utteranceEndChan { | |
log.Printf("π Utterance completed") | |
} | |
}() | |
// Log errors | |
wg.Add(1) | |
go func() { | |
defer wg.Done() | |
for err := range h.errorChan { | |
log.Printf("β Deepgram error: Type=%s Message=%s Description=%s", | |
err.ErrCode, err.ErrMsg, err.Description) | |
} | |
}() | |
// Handle unhandled messages | |
wg.Add(1) | |
go func() { | |
defer wg.Done() | |
for msg := range h.unhandledChan { | |
log.Printf("β οΈ Unhandled message: %s", string(*msg)) | |
} | |
}() | |
// Handle transcription messages | |
wg.Add(1) | |
go func() { | |
defer wg.Done() | |
for mr := range h.messageChan { | |
if len(mr.Channel.Alternatives) == 0 { | |
log.Printf("β οΈ No transcription alternatives received") | |
continue | |
} | |
text := mr.Channel.Alternatives[0].Transcript | |
if text == "" { | |
log.Printf("β οΈ Empty transcript received") | |
continue | |
} | |
if mr.IsFinal { | |
log.Printf("π Final transcription: %s", text) | |
transcription := map[string]interface{}{ | |
"text": text, | |
"timestamp": time.Now(). | |
UnixNano() / | |
int64( | |
time.Millisecond, | |
), | |
} | |
// Handle LLM response | |
go handleLLMResponse(text, h.room) | |
jsonBytes, err := json.Marshal(transcription) | |
if err != nil { | |
log.Printf("β Failed to marshal transcription: %v", err) | |
continue | |
} | |
err = h.room.LocalParticipant.PublishData( | |
jsonBytes, | |
lksdk.WithDataPublishTopic("transcription"), | |
lksdk.WithDataPublishReliable(true)) | |
if err != nil { | |
log.Printf("β Failed to publish transcription: %v", err) | |
} else { | |
log.Printf("β Published transcription to room") | |
} | |
} else { | |
log.Printf("π Interim transcription: %s", text) | |
} | |
} | |
}() | |
wg.Wait() | |
return nil | |
} | |
type TokenResponse struct { | |
URL string `json:"url"` | |
Token string `json:"token"` | |
} | |
func handleLLMResponse(text string, room *lksdk.Room) { | |
// Wait 2 seconds before responding | |
time.Sleep(2 * time.Second) | |
// TODO: In the future, this will call the actual LLM | |
response := text | |
// Create TTS client without initialization | |
c := speakclient.NewREST( | |
os.Getenv("DEEPGRAM_API_KEY"), | |
&interfaces.ClientOptions{}, | |
) | |
dg := speak.New(c) | |
// Generate audio file | |
audioFile := "./response.ogg" | |
options := &interfaces.SpeakOptions{ | |
Model: "aura-athena-en", | |
Encoding: "opus", | |
BitRate: 32000, | |
} | |
_, err := dg.ToSave(context.Background(), audioFile, response, options) | |
if err != nil { | |
log.Printf("β Failed to generate TTS: %v", err) | |
return | |
} | |
// Create a LocalTrack from the audio file | |
track, err := lksdk.NewLocalFileTrack(audioFile, | |
lksdk.ReaderTrackWithMime(webrtc.MimeTypeOpus), | |
lksdk.ReaderTrackWithOnWriteComplete(func() { | |
log.Printf("β Finished playing TTS response") | |
// Clean up the temporary audio file | |
os.Remove(audioFile) | |
}), | |
) | |
if err != nil { | |
log.Printf("β Failed to create local track: %v", err) | |
return | |
} | |
// Publish the track to the room | |
_, err = room.LocalParticipant.PublishTrack( | |
track, | |
&lksdk.TrackPublicationOptions{ | |
Name: "agent-response", | |
Source: livekit.TrackSource_MICROPHONE, | |
}, | |
) | |
if err != nil { | |
log.Printf("β Failed to publish track: %v", err) | |
return | |
} | |
log.Printf("β Published TTS response: %s", response) | |
} | |
func main() { | |
// Load environment variables | |
if err := godotenv.Load(); err != nil { | |
log.Fatal("Error loading .env file") | |
} | |
url := os.Getenv("LIVEKIT_URL") | |
apiKey := os.Getenv("LIVEKIT_API_KEY") | |
apiSecret := os.Getenv("LIVEKIT_API_SECRET") | |
port := os.Getenv("PORT") | |
identity := os.Getenv("AGENT_IDENTITY") | |
if url == "" || apiKey == "" || apiSecret == "" { | |
log.Fatal( | |
"LIVEKIT_URL, LIVEKIT_API_KEY and LIVEKIT_API_SECRET are required", | |
) | |
} | |
if port == "" { | |
port = "8080" | |
} | |
if identity == "" { | |
identity = "demo-agent" | |
} | |
// Create a random room name | |
roomName := fmt.Sprintf("room-%d", time.Now().UnixNano()) | |
// Start HTTP server to provide tokens | |
http.HandleFunc("/token", func(w http.ResponseWriter, r *http.Request) { | |
// Create token for playground user | |
at := auth.NewAccessToken(apiKey, apiSecret) | |
grant := &auth.VideoGrant{ | |
RoomJoin: true, | |
Room: roomName, | |
} | |
at.AddGrant(grant). | |
SetIdentity("playground-user"). | |
SetValidFor(24 * time.Hour) | |
token, err := at.ToJWT() | |
if err != nil { | |
http.Error(w, err.Error(), http.StatusInternalServerError) | |
return | |
} | |
resp := TokenResponse{ | |
URL: url, | |
Token: token, | |
} | |
w.Header().Set("Content-Type", "application/json") | |
w.Header().Set("Access-Control-Allow-Origin", "*") | |
json.NewEncoder(w).Encode(resp) | |
}) | |
go func() { | |
log.Printf("Token endpoint listening on :%s", port) | |
if err := http.ListenAndServe(":"+port, nil); err != nil { | |
log.Fatal(err) | |
} | |
}() | |
// Create room client | |
callback := lksdk.NewRoomCallback() | |
var room *lksdk.Room // Declare room here so it's accessible in the closure | |
// Add connection/disconnection callbacks | |
callback.OnParticipantConnected = func(participant *lksdk.RemoteParticipant) { | |
log.Printf( | |
"π’ Participant connected: %s (identity: %s)", | |
participant.Name(), | |
participant.Identity(), | |
) | |
} | |
callback.OnParticipantDisconnected = func(participant *lksdk.RemoteParticipant) { | |
log.Printf( | |
"π΄ Participant disconnected: %s (identity: %s)", | |
participant.Name(), | |
participant.Identity(), | |
) | |
} | |
callback.OnIsSpeakingChanged = func(p lksdk.Participant) { | |
if p.IsSpeaking() { | |
log.Printf("π€ Participant %s started speaking", p.Identity()) | |
} else { | |
log.Printf("π Participant %s stopped speaking", p.Identity()) | |
} | |
} | |
callback.ParticipantCallback.OnTrackSubscribed = func(track *webrtc.TrackRemote, publication *lksdk.RemoteTrackPublication, rp *lksdk.RemoteParticipant) { | |
// Skip if this is our own audio | |
if rp.Identity() == identity { | |
log.Printf("βοΈ Skipping own audio track") | |
return | |
} | |
// Skip if this isn't the playground user | |
if rp.Identity() != "playground-user" { | |
log.Printf( | |
"βοΈ Skipping audio from non-playground user: %s", | |
rp.Identity(), | |
) | |
return | |
} | |
if track.Kind() == webrtc.RTPCodecTypeAudio { | |
log.Printf( | |
"π§ Audio track subscribed from playground user: %s", | |
rp.Identity(), | |
) | |
// Get codec parameters | |
codec := track.Codec() | |
log.Printf("ποΈ Audio codec: %s, channels: %d, sample rate: %d", | |
codec.MimeType, codec.Channels, codec.ClockRate) | |
// Initialize Deepgram client | |
listenclient.Init(listenclient.InitLib{ | |
LogLevel: listenclient.LogLevelFull, // Increase logging | |
}) | |
deepgramKey := os.Getenv("DEEPGRAM_API_KEY") | |
if deepgramKey == "" { | |
log.Printf("β DEEPGRAM_API_KEY not set, skipping transcription") | |
return | |
} | |
// Set transcription options | |
tOptions := &interfaces.LiveTranscriptionOptions{ | |
Model: "nova-2", | |
Language: "en-US", | |
Punctuate: true, | |
Encoding: "opus", | |
Channels: 1, | |
SampleRate: 48000, | |
SmartFormat: true, | |
InterimResults: true, | |
VadEvents: true, | |
} | |
// Create callback handler | |
callback := NewTranscriptionHandler(room) | |
// Create Deepgram client | |
cOptions := &interfaces.ClientOptions{ | |
EnableKeepAlive: true, | |
} | |
dgClient, err := listenclient.NewWSUsingChan( | |
context.Background(), | |
deepgramKey, | |
cOptions, | |
tOptions, | |
callback, | |
) | |
if err != nil { | |
log.Printf("β Failed to create Deepgram client: %v", err) | |
return | |
} | |
// Connect to Deepgram | |
if !dgClient.Connect() { | |
log.Printf("β Failed to connect to Deepgram") | |
return | |
} | |
log.Printf("β Connected to Deepgram successfully") | |
// Start reading from the track | |
go func() { | |
defer dgClient.Stop() | |
packetCount := 0 | |
lastLog := time.Now() | |
for { | |
rtpPacket, _, err := track.ReadRTP() | |
if err != nil { | |
if err != io.EOF { | |
log.Printf("β Failed to read RTP packet: %v", err) | |
} | |
return | |
} | |
packetCount++ | |
if time.Since(lastLog) > time.Second { | |
log.Printf( | |
"π Processed %d audio packets in last second", | |
packetCount, | |
) | |
packetCount = 0 | |
lastLog = time.Now() | |
} | |
if _, err := dgClient.Write(rtpPacket.Payload); err != nil { | |
log.Printf("β Failed to send data to Deepgram: %v", err) | |
return | |
} | |
} | |
}() | |
} | |
} | |
room = lksdk.NewRoom(callback) | |
// Connect to room | |
log.Printf("Connecting to room %s as %s", roomName, identity) | |
c := lksdk.ConnectInfo{ | |
APIKey: apiKey, | |
APISecret: apiSecret, | |
RoomName: roomName, | |
ParticipantIdentity: identity, | |
ParticipantKind: lksdk.ParticipantAgent, | |
} | |
if err := room.Join(url, c); err != nil { | |
log.Fatal("failed to join room:", err) | |
} | |
defer room.Disconnect() | |
log.Printf("Connected to room %s as %s", roomName, identity) | |
// Wait for interrupt | |
sigChan := make(chan os.Signal, 1) | |
signal.Notify(sigChan, syscall.SIGINT) | |
<-sigChan | |
log.Println("Received interrupt signal, shutting down...") | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment