Skip to content

Instantly share code, notes, and snippets.

@AngeloGiacco
Created June 11, 2025 23:32
Show Gist options
  • Save AngeloGiacco/334e3eea90b844c54ca291c7d74bf7bd to your computer and use it in GitHub Desktop.
Save AngeloGiacco/334e3eea90b844c54ca291c7d74bf7bd to your computer and use it in GitHub Desktop.
convai_websocket
// Enhanced ElevenLabs WebSocket API implementation
// Emulates Swift SDK functionality and patterns
#include <websocketpp/config/asio_client.hpp>
#include <websocketpp/client.hpp>
#include <nlohmann/json.hpp>
#include <iostream>
#include <thread>
#include <chrono>
#include <functional>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <memory>
#include <atomic>
#include <string>
#include <map>
using json = nlohmann::json;
using ws_client = websocketpp::client<websocketpp::config::asio_tls_client>;
namespace ElevenLabs {
// Enums matching Swift SDK
enum class Status {
CONNECTING,
CONNECTED,
DISCONNECTING,
DISCONNECTED
};
enum class Mode {
SPEAKING,
LISTENING
};
enum class Role {
USER,
AI
};
// Callback types matching Swift SDK functionality
struct Callbacks {
std::function<void(const std::string&)> onConnect = [](const std::string&) {};
std::function<void()> onDisconnect = []() {};
std::function<void(const std::string&, Role)> onMessage = [](const std::string&, Role) {};
std::function<void(const std::string&, const std::string&)> onError = [](const std::string&, const std::string&) {};
std::function<void(Status)> onStatusChange = [](Status) {};
std::function<void(Mode)> onModeChange = [](Mode) {};
std::function<void(float)> onVolumeUpdate = [](float) {};
std::function<void(float)> onOutputVolumeUpdate = [](float) {};
std::function<void(const std::string&, const std::string&, Role)> onMessageCorrection =
[](const std::string&, const std::string&, Role) {};
};
// Session configuration
struct SessionConfig {
std::string agentId;
std::string signedUrl;
json conversationConfigOverride;
json customLlmExtraBody;
json dynamicVariables;
SessionConfig(const std::string& id) : agentId(id) {}
SessionConfig() = default;
};
// Client tools support
class ClientTools {
public:
using ToolHandler = std::function<std::string(const json&)>;
void registerTool(const std::string& name, ToolHandler handler) {
std::lock_guard<std::mutex> lock(toolsMutex_);
tools_[name] = handler;
}
std::string handleTool(const std::string& name, const json& parameters) {
std::lock_guard<std::mutex> lock(toolsMutex_);
auto it = tools_.find(name);
if (it != tools_.end()) {
return it->second(parameters);
}
throw std::runtime_error("Tool not found: " + name);
}
private:
std::mutex toolsMutex_;
std::map<std::string, ToolHandler> tools_;
};
class Conversation {
public:
Conversation(const SessionConfig& config, const Callbacks& callbacks = Callbacks(),
std::shared_ptr<ClientTools> clientTools = nullptr)
: config_(config)
, callbacks_(callbacks)
, clientTools_(clientTools)
, status_(Status::CONNECTING)
, mode_(Mode::LISTENING)
, volume_(1.0f)
, isProcessingInput_(true)
, lastInterruptTimestamp_(0)
, connected_(false)
{
// Initialize WebSocket client
client_.init_asio();
// Configure TLS
client_.set_tls_init_handler([](websocketpp::connection_hdl) {
auto ctx = websocketpp::lib::make_shared<boost::asio::ssl::context>(
boost::asio::ssl::context::tlsv12_client
);
ctx->set_default_verify_paths();
return ctx;
});
// Set up event handlers
client_.set_open_handler([this](auto hdl) { onOpen(hdl); });
client_.set_message_handler([this](auto hdl, auto msg) { onMessage(hdl, msg); });
client_.set_close_handler([this](auto hdl) { onClose(hdl); });
client_.set_fail_handler([this](auto hdl) { onFail(hdl); });
}
~Conversation() {
endSession();
}
void startSession() {
updateStatus(Status::CONNECTING);
std::string url;
if (!config_.signedUrl.empty()) {
url = config_.signedUrl;
} else if (!config_.agentId.empty()) {
url = "wss://api.elevenlabs.io/v1/convai/conversation?agent_id=" + config_.agentId;
} else {
callbacks_.onError("Invalid configuration", "Missing agentId or signedUrl");
return;
}
websocketpp::lib::error_code ec;
auto con = client_.get_connection(url, ec);
if (ec) {
callbacks_.onError("Connection error", ec.message());
return;
}
hdl_ = con->get_handle();
client_.connect(con);
// Start the WebSocket thread
wsThread_ = std::thread([this]() {
try {
client_.run();
} catch (const std::exception& e) {
callbacks_.onError("WebSocket thread error", e.what());
}
});
}
void endSession() {
if (status_ == Status::CONNECTED) {
updateStatus(Status::DISCONNECTING);
if (connected_) {
client_.close(hdl_, websocketpp::close::status::normal, "Session ended");
}
if (wsThread_.joinable()) {
wsThread_.join();
}
updateStatus(Status::DISCONNECTED);
callbacks_.onDisconnect();
}
}
// Audio input methods
void sendAudio(const std::string& base64Audio) {
if (!isProcessingInput_) return;
json message = {
{"type", "user_audio_chunk"},
{"user_audio_chunk", base64Audio}
};
sendMessage(message);
}
void startRecording() {
isProcessingInput_ = true;
}
void stopRecording() {
isProcessingInput_ = false;
}
// Message sending methods
void sendUserMessage(const std::string& text = "") {
json message = {{"type", "user_message"}};
if (!text.empty()) {
message["text"] = text;
}
sendMessage(message);
}
void sendContextualUpdate(const std::string& text) {
json message = {
{"type", "contextual_update"},
{"text", text}
};
sendMessage(message);
}
void sendUserActivity() {
json message = {{"type", "user_activity"}};
sendMessage(message);
}
// Getters
std::string getConversationId() const {
std::lock_guard<std::mutex> lock(dataMutex_);
return conversationId_;
}
float getVolume() const {
return volume_;
}
void setVolume(float volume) {
volume_ = std::clamp(volume, 0.0f, 1.0f);
// In real implementation, this would control audio output volume
}
Status getStatus() const {
return status_;
}
Mode getMode() const {
return mode_;
}
private:
void onOpen(websocketpp::connection_hdl hdl) {
connected_ = true;
std::cout << "βœ… Connected to ElevenLabs\n";
// Send initialization message
json initMessage = {{"type", "conversation_initiation_client_data"}};
// Add configuration overrides
if (!config_.conversationConfigOverride.empty()) {
initMessage["conversation_config_override"] = config_.conversationConfigOverride;
}
if (!config_.customLlmExtraBody.empty()) {
initMessage["custom_llm_extra_body"] = config_.customLlmExtraBody;
}
if (!config_.dynamicVariables.empty()) {
initMessage["dynamic_variables"] = config_.dynamicVariables;
}
sendMessage(initMessage);
// Wait for conversation metadata
receiveMessages();
}
void onClose(websocketpp::connection_hdl hdl) {
connected_ = false;
updateStatus(Status::DISCONNECTED);
std::cout << "❌ Disconnected\n";
callbacks_.onDisconnect();
}
void onFail(websocketpp::connection_hdl hdl) {
connected_ = false;
updateStatus(Status::DISCONNECTED);
auto con = client_.get_con_from_hdl(hdl);
std::string error = "Connection failed: " + std::to_string(con->get_response_code()) +
" " + con->get_response_msg();
callbacks_.onError("Connection failed", error);
}
void onMessage(websocketpp::connection_hdl hdl, ws_client::message_ptr msg) {
try {
json data = json::parse(msg->get_payload());
const std::string type = data["type"];
if (type == "conversation_initiation_metadata") {
handleInitiationMetadata(data);
}
else if (type == "ping") {
handlePing(data);
}
else if (type == "user_transcript") {
handleUserTranscript(data);
}
else if (type == "agent_response") {
handleAgentResponse(data);
}
else if (type == "agent_response_correction") {
handleAgentResponseCorrection(data);
}
else if (type == "audio") {
handleAudio(data);
}
else if (type == "interruption") {
handleInterruption(data);
}
else if (type == "client_tool_call") {
handleClientToolCall(data);
}
else if (type == "internal_tentative_agent_response") {
// Ignore internal messages
}
else if (type == "internal_vad_score") {
// Ignore internal messages
}
else if (type == "internal_turn_probability") {
// Ignore internal messages
}
else {
std::cout << "⚠️ Unknown message type: " << type << std::endl;
}
}
catch (const std::exception& e) {
callbacks_.onError("Message parsing error", e.what());
}
}
void handleInitiationMetadata(const json& data) {
auto metadata = data["conversation_initiation_metadata_event"];
{
std::lock_guard<std::mutex> lock(dataMutex_);
conversationId_ = metadata["conversation_id"];
}
updateStatus(Status::CONNECTED);
callbacks_.onConnect(conversationId_);
std::cout << "🎯 Conversation ID: " << conversationId_ << std::endl;
// Extract audio format info if available
if (metadata.contains("agent_output_audio_format")) {
std::string audioFormat = metadata["agent_output_audio_format"];
std::cout << "πŸ”Š Audio format: " << audioFormat << std::endl;
}
}
void handlePing(const json& data) {
auto pingEvent = data["ping_event"];
int eventId = pingEvent["event_id"];
// Optional delay
if (pingEvent.contains("ping_ms")) {
int delayMs = pingEvent["ping_ms"];
if (delayMs > 0) {
std::this_thread::sleep_for(std::chrono::milliseconds(delayMs));
}
}
json pong = {
{"type", "pong"},
{"event_id", eventId}
};
sendMessage(pong);
}
void handleUserTranscript(const json& data) {
auto event = data["user_transcription_event"];
std::string transcript = event["user_transcript"];
std::cout << "πŸ“ You said: " << transcript << std::endl;
callbacks_.onMessage(transcript, Role::USER);
}
void handleAgentResponse(const json& data) {
auto event = data["agent_response_event"];
std::string response = event["agent_response"];
std::cout << "πŸ€– Agent: " << response << std::endl;
callbacks_.onMessage(response, Role::AI);
}
void handleAgentResponseCorrection(const json& data) {
auto event = data["agent_response_correction_event"];
std::string original = event["original_agent_response"];
std::string corrected = event["corrected_agent_response"];
std::cout << "πŸ”„ Correction: '" << original << "' -> '" << corrected << "'" << std::endl;
callbacks_.onMessageCorrection(original, corrected, Role::AI);
}
void handleAudio(const json& data) {
auto event = data["audio_event"];
std::string audioBase64 = event["audio_base_64"];
int eventId = event["event_id"];
// Check if this audio should be played (not interrupted)
if (lastInterruptTimestamp_ <= eventId) {
std::cout << "πŸ”Š Audio received (event " << eventId << ", "
<< audioBase64.length() << " chars)\n";
updateMode(Mode::SPEAKING);
// In real implementation: decode base64, convert to audio format, and play
// For now, simulate audio processing
processAudioData(audioBase64);
}
}
void handleInterruption(const json& data) {
auto event = data["interruption_event"];
int eventId = event["event_id"];
std::string reason = event.value("reason", "unknown");
lastInterruptTimestamp_ = eventId;
std::cout << "⚠️ Interrupted (event " << eventId << "): " << reason << std::endl;
updateMode(Mode::LISTENING);
// In real implementation: stop audio playback, clear buffers
clearAudioBuffers();
}
void handleClientToolCall(const json& data) {
if (!clientTools_) {
callbacks_.onError("Client tool call received but no tools registered", "");
return;
}
auto toolCall = data["client_tool_call"];
std::string toolName = toolCall["tool_name"];
std::string toolCallId = toolCall["tool_call_id"];
json parameters = toolCall["parameters"];
std::cout << "πŸ”§ Tool call: " << toolName << std::endl;
// Execute tool in separate thread to avoid blocking
std::thread([this, toolName, toolCallId, parameters]() {
try {
std::string result = clientTools_->handleTool(toolName, parameters);
json response = {
{"type", "client_tool_result"},
{"tool_call_id", toolCallId},
{"result", result},
{"is_error", false}
};
sendMessage(response);
}
catch (const std::exception& e) {
json response = {
{"type", "client_tool_result"},
{"tool_call_id", toolCallId},
{"result", e.what()},
{"is_error", true}
};
sendMessage(response);
}
}).detach();
}
void sendMessage(const json& message) {
if (!connected_) return;
std::string payload = message.dump();
websocketpp::lib::error_code ec;
client_.send(hdl_, payload, websocketpp::frame::opcode::text, ec);
if (ec) {
callbacks_.onError("Send error", ec.message());
}
}
void receiveMessages() {
// This method is called once after connection is established
// The actual message receiving is handled by the WebSocket event handlers
}
void updateStatus(Status newStatus) {
if (status_ != newStatus) {
status_ = newStatus;
callbacks_.onStatusChange(newStatus);
}
}
void updateMode(Mode newMode) {
if (mode_ != newMode) {
mode_ = newMode;
callbacks_.onModeChange(newMode);
}
}
void processAudioData(const std::string& base64Audio) {
// Simulate audio processing and volume calculation
// In real implementation: decode base64 -> PCM -> play audio
// Simulate volume update
float simulatedVolume = 0.5f + (rand() % 100) / 200.0f; // Random 0.5-1.0
callbacks_.onOutputVolumeUpdate(simulatedVolume);
// Simulate speaking mode duration
std::thread([this]() {
std::this_thread::sleep_for(std::chrono::milliseconds(500));
updateMode(Mode::LISTENING);
}).detach();
}
void clearAudioBuffers() {
// In real implementation: clear audio playback buffers
std::cout << "🧹 Clearing audio buffers\n";
}
// Member variables
SessionConfig config_;
Callbacks callbacks_;
std::shared_ptr<ClientTools> clientTools_;
ws_client client_;
websocketpp::connection_hdl hdl_;
std::thread wsThread_;
mutable std::mutex dataMutex_;
std::string conversationId_;
std::atomic<Status> status_;
std::atomic<Mode> mode_;
std::atomic<float> volume_;
std::atomic<bool> isProcessingInput_;
std::atomic<int> lastInterruptTimestamp_;
std::atomic<bool> connected_;
};
} // namespace ElevenLabs
// Example usage demonstrating Swift SDK patterns
int main() {
std::cout << "Enhanced ElevenLabs C++ Client\n";
std::cout << "============================\n\n";
// Create session config (matching Swift SDK patterns)
ElevenLabs::SessionConfig config("your-agent-id-here");
// Configure callbacks (matching Swift SDK)
ElevenLabs::Callbacks callbacks;
callbacks.onConnect = [](const std::string& conversationId) {
std::cout << "πŸŽ‰ Connected! Conversation ID: " << conversationId << std::endl;
};
callbacks.onDisconnect = []() {
std::cout << "πŸ‘‹ Disconnected!" << std::endl;
};
callbacks.onMessage = [](const std::string& message, ElevenLabs::Role role) {
std::string roleStr = (role == ElevenLabs::Role::USER) ? "You" : "Agent";
std::cout << "πŸ’¬ " << roleStr << ": " << message << std::endl;
};
callbacks.onStatusChange = [](ElevenLabs::Status status) {
const char* statusStr[] = {"CONNECTING", "CONNECTED", "DISCONNECTING", "DISCONNECTED"};
std::cout << "πŸ“Š Status: " << statusStr[static_cast<int>(status)] << std::endl;
};
callbacks.onModeChange = [](ElevenLabs::Mode mode) {
const char* modeStr[] = {"SPEAKING", "LISTENING"};
std::cout << "πŸŽ™οΈ Mode: " << modeStr[static_cast<int>(mode)] << std::endl;
};
callbacks.onVolumeUpdate = [](float volume) {
// Input volume updates (less frequent logging)
static int counter = 0;
if (++counter % 50 == 0) { // Log every 50th update
std::cout << "🎚️ Input Volume: " << volume << std::endl;
}
};
callbacks.onError = [](const std::string& error, const std::string& details) {
std::cout << "❌ Error: " << error << " - " << details << std::endl;
};
// Create and register client tools
auto clientTools = std::make_shared<ElevenLabs::ClientTools>();
clientTools->registerTool("get_time", [](const json& params) -> std::string {
auto now = std::chrono::system_clock::now();
auto time_t = std::chrono::system_clock::to_time_t(now);
return std::string("Current time: ") + std::ctime(&time_t);
});
// Create conversation
ElevenLabs::Conversation conversation(config, callbacks, clientTools);
// Start session
conversation.startSession();
// Simulate usage
std::this_thread::sleep_for(std::chrono::seconds(2));
// Send some test messages
conversation.sendUserMessage("Hello, how are you?");
std::this_thread::sleep_for(std::chrono::seconds(1));
//send contextual update
conversation.sendContextualUpdate("User is testing the system");
std::this_thread::sleep_for(std::chrono::seconds(1));
// Simulate audio input
conversation.sendAudio("dGVzdF9hdWRpb19kYXRh"); // "test_audio_data" in base64
// Run for 30 seconds
std::this_thread::sleep_for(std::chrono::seconds(30));
// End session
conversation.endSession();
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment