Skip to content

Instantly share code, notes, and snippets.

@josep112
Last active August 14, 2025 17:02
Show Gist options
  • Select an option

  • Save josep112/142747002b6c1421642b6afa4a2ddf7c to your computer and use it in GitHub Desktop.

Select an option

Save josep112/142747002b6c1421642b6afa4a2ddf7c to your computer and use it in GitHub Desktop.
Asr groq
/*
* Copyright 2022 Sangoma Technologies Corporation
* Kevin Harwell <[email protected]>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
const utils = require("./utils");
/**
* Supported codecs
*/
const supported = [
{
"name": "ulaw",
"sampleRate": 8000,
"attributes" : [],
},
{
"name": "slin16",
"sampleRate": 16000,
"attributes" : [],
},
{
"name": "opus",
"sampleRate": 48000,
"attributes" : [],
},
];
/**
* Checks if given codecs match.
*
* @param {Object} obj1 - A codec object
* @param {Object} obj2 - A codec object
* @return {boolean} true if codecs match otherwise false.
*/
function equal(obj1, obj2) {
return obj1.name === obj2.name;
}
/**
* Converts codecs to a comma separated string of codec names.
*
* @param {Object|Object[]} objs - Codecs to convert
* @return {string} A comma separated string of codec names.
*/
function toString(objs) {
if (!Array.isArray(objs)) {
objs = [objs];
}
return objs.map(o => o.name).join(", ");
}
/** @class Codecs. */
class Codecs {
/**
* Creates an instance of Codecs.
*
* @param {Object} options - Codec options
*/
constructor(options) {
this.codecs = options.codecs ?
utils.intersect(supported, options.codecs, equal) : supported;
this.selected = this.codecs[0]; // Default to first in list
}
/**
* Selects the first matching codec between supported and given codecs
*
* @param {Object|Object[]} codecs - Codecs to intersect with supported
* @return The first selected codec
*/
first(codecs) {
try {
let res = utils.first(this.codecs, codecs, equal);
if (res) {
return res;
}
} catch (e) {
}
throw new Error("Codec " + toString(codecs) + " not supported");
}
};
module.exports = {
Codecs,
}
/*
* Copyright 2022 Sangoma Technologies Corporation
* Kevin Harwell <[email protected]>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
const { randomUUID } = require('crypto');
const { createEchoManager, wrapTTSFunction } = require('./echo_manager');
const { spawn } = require('child_process');
const path = require('path');
function handleError(e, msg) {
msg.error_msg = e.message;
}
function sendMessage(speech, msg) {
speech.transport.send(JSON.stringify(msg), { binary: false });
}
function sendSetRequest(speech, params) {
request = {
request: "set",
id: randomUUID(),
params,
};
sendMessage(speech, request);
}
function handleGetRequest(speech, request, response) {
if (!request.params) {
throw new Error("Missing request parameters");
}
let params = {};
for (let p of request.params) {
if (p === "codec") {
params.codecs = speech.codecs.selected;
} else if (p === "language") {
params.language = speech.languages.selected;
} else if (p === "results") {
params.results = speech.provider.results.splice(0);
} else if (p === "echo_stats") {
// Nova funcionalidade: estatísticas do cancelamento de eco
params.echo_stats = speech.echoManager ? speech.echoManager.getStats() : null;
} else {
console.warn("Ignoring unsupported parameter '" + p + "' in '" +
request.request + "' request");
}
}
response.params = params;
}
function handleSetRequest(speech, request, response) {
if (!request.codecs || !request.params) {
throw new Error("Missing request parameters");
}
/*
* It's all or nothing for an incoming set request. So first validate
* all values, then set newly selected, and lastly set the response.
*/
let codec = null;
let params = {};
if (request.codecs) {
codec = speech.codecs.first(request.codecs);
}
for (let [k, v] of Object.entries(request.params)) {
if (k == "language") {
params.language = speech.languages.first(v);
} else if (k == "echo_detection") {
// Nova funcionalidade: controle do cancelamento de eco
if (speech.echoManager) {
speech.echoManager.setEchoDetection(v);
params.echo_detection = v;
}
} else if (k == "tts_text" && k == "empresa") {
// Nova funcionalidade: processamento de TTS com cancelamento de eco
if (speech.echoManager) {
handleTTSRequest(speech, v, request.params.empresa)
.then(result => {
console.log("TTS processed with echo control:", result.filePath);
})
.catch(error => {
console.error("TTS processing error:", error);
});
}
} else {
console.warn("Ignoring unsupported parameter '" + k + "' in '" +
request.request + "' request");
}
}
if (codec) {
response.codecs = [speech.codecs.selected = codec];
}
if (Object.keys(params).length) {
if (params.language) {
speech.languages.selected = params.language;
}
response.params = params;
}
if (response.codecs || response.params) {
// Start/Restart provider if any parameters were changed
speech.provider.restart({
codec: speech.codecs.selected,
language: speech.languages.selected,
});
}
}
/**
* Manipula requisições de TTS com cancelamento de eco
*/
async function handleTTSRequest(speech, text, empresa) {
if (!speech.echoManager) {
throw new Error("Echo manager not initialized");
}
try {
// Processa TTS com controle de eco
const result = await speech.echoManager.processTTSWithEchoControl(text, empresa, {
pauseDetection: false, // Manter detecção ativa para barge-in
resumeDelay: 200
});
// Emite evento para notificar que TTS foi gerado
sendSetRequest(speech, {
tts_generated: {
filePath: result.filePath,
text: result.text,
empresa: result.empresa,
timestamp: result.timestamp
}
});
return result;
} catch (error) {
console.error("TTS request handling error:", error);
throw error;
}
}
/**
* Função auxiliar para executar TTS tradicional
*/
async function executePiperTTS(text, empresa) {
return new Promise((resolve, reject) => {
const piperScript = path.join(__dirname, '..', 'tts_piper.sh');
const process = spawn('bash', [piperScript, text, empresa], {
stdio: ['pipe', 'pipe', 'pipe']
});
let stdout = '';
let stderr = '';
process.stdout.on('data', (data) => {
stdout += data.toString();
});
process.stderr.on('data', (data) => {
stderr += data.toString();
});
process.on('close', (code) => {
if (code === 0) {
resolve(stdout.trim());
} else {
reject(new Error(`TTS failed: ${stderr}`));
}
});
process.on('error', (error) => {
reject(error);
});
});
}
function handleRequest(speech, msg) {
const handlers = {
"get": handleGetRequest,
"set": handleSetRequest,
"setup": handleSetRequest,
};
let response = { response: msg.request, id: msg.id };
try {
handlers[msg.request](speech, msg, response);
} catch (e) {
handleError(e, response);
}
return response;
}
function handleResponse(speech, msg) {
return null; // TODO
}
/**
* Manages configuration, communication, messaging, and data between
* a connected transport and speech provider with echo cancellation.
*
* @param {Object} speech - speech object
* @param {Object} speech.codecs - allowed codec(s)
* @param {Object} speech.languages - allowed language(s)
* @param {Object} speech.transport - remote connection
* @param {Object} speech.provider - speech provider
* @param {Object} [speech.echoOptions] - echo cancellation options
*/
function dispatch(speech) {
// Inicializa o gerenciador de eco se não existir
if (!speech.echoManager && speech.provider) {
console.log("Dispatcher: Initializing echo manager...");
const echoOptions = speech.echoOptions || {
ttsDirectory: "/usr/src/tts-tmp",
piperScript: path.join(__dirname, '..', 'tts_piper.sh'),
autoCleanup: true,
cleanupMaxAge: 3600000 // 1 hora
};
speech.echoManager = createEchoManager(speech.provider, echoOptions);
// Event listeners para o echo manager
speech.echoManager.on('referenceLoaded', (info) => {
console.debug(`Dispatcher: Reference audio loaded - ${info.filePath} (${info.size} bytes)`);
// Notifica cliente sobre carregamento de referência
sendSetRequest(speech, {
echo_reference_loaded: {
filePath: path.basename(info.filePath),
size: info.size,
timestamp: info.timestamp
}
});
});
console.log("Dispatcher: Echo manager initialized");
}
speech.transport.on("close", () => {
console.log("Dispatcher: Transport closed, cleaning up...");
speech.provider.end();
// Cleanup do echo manager
if (speech.echoManager) {
speech.echoManager.cleanup();
}
});
speech.transport.on("message", (data, isBinary) => {
if (isBinary) {
// Dados binários de áudio - passa para o provider
speech.provider.write(data);
return;
}
console.debug("message: " + data);
let msg = JSON.parse(data);
// Manipula diferentes tipos de mensagem
if (msg.hasOwnProperty('request')) {
msg = handleRequest(speech, msg);
} else if (msg.hasOwnProperty('response')) {
msg = handleResponse(speech, msg);
} else if (msg.hasOwnProperty('tts_request')) {
// Nova funcionalidade: requisição direta de TTS
handleTTSRequest(speech, msg.tts_request.text, msg.tts_request.empresa)
.then(result => {
sendMessage(speech, {
tts_response: {
id: msg.id || randomUUID(),
success: true,
filePath: result.filePath,
timestamp: result.timestamp
}
});
})
.catch(error => {
sendMessage(speech, {
tts_response: {
id: msg.id || randomUUID(),
success: false,
error: error.message
}
});
});
return;
} else {
msg = null;
}
if (msg) {
sendMessage(speech, msg);
}
});
speech.provider.on("result", (result) => {
// Resultado do speech-to-text
console.debug("Dispatcher: STT result received:", JSON.stringify(result));
sendSetRequest(speech, { results: [ result ] });
});
// Inicialização do monitoramento para detecção automática
if (speech.echoManager && speech.empresa) {
speech.echoManager.startWatching(speech.empresa);
}
// Log estatísticas periodicamente (debug)
if (process.env.NODE_ENV === 'development') {
setInterval(() => {
if (speech.echoManager) {
const stats = speech.echoManager.getStats();
console.debug("Dispatcher: Echo stats -", JSON.stringify(stats));
}
}, 30000); // A cada 30 segundos
}
}
/**
* Função utilitária para criar TTS wrapper compatível
*/
function createTTSWrapper(echoManager) {
return wrapTTSFunction(executePiperTTS, echoManager);
}
module.exports = {
dispatch,
createTTSWrapper,
handleTTSRequest
};
/*
* Echo Manager - Integra com Piper TTS para cancelamento de eco
*
* Este módulo monitora os arquivos de áudio gerados pelo Piper
* e fornece referência para o cancelamento de eco no GroqProvider
*/
const fs = require("fs");
const path = require("path");
const EventEmitter = require("events");
const { spawn } = require("child_process");
class EchoManager extends EventEmitter {
constructor(options = {}) {
super();
this.ttsDirectory = options.ttsDirectory || "/usr/src/tts-tmp";
this.piperScript = options.piperScript || "./tts_piper.sh";
this.speechProvider = null;
this.watchedFiles = new Set();
// Monitoramento de diretório
this.watchers = new Map();
console.log("EchoManager: Initialized");
}
/**
* Associa um speech provider para cancelamento de eco
*/
setSpeechProvider(provider) {
this.speechProvider = provider;
console.log("EchoManager: Speech provider associated");
}
/**
* Inicia o monitoramento de uma empresa
*/
startWatching(empresa) {
const empresaDir = path.join(this.ttsDirectory, empresa);
if (!fs.existsSync(empresaDir)) {
fs.mkdirSync(empresaDir, { recursive: true });
}
if (this.watchers.has(empresa)) {
console.debug(`EchoManager: Already watching ${empresa}`);
return;
}
const watcher = fs.watch(empresaDir, { persistent: false }, (eventType, filename) => {
if (eventType === 'rename' && filename && filename.endsWith('.wav')) {
const filePath = path.join(empresaDir, filename);
this.handleNewTTSFile(filePath, empresa);
}
});
this.watchers.set(empresa, watcher);
console.log(`EchoManager: Started watching TTS directory for ${empresa}: ${empresaDir}`);
}
/**
* Para o monitoramento de uma empresa
*/
stopWatching(empresa) {
const watcher = this.watchers.get(empresa);
if (watcher) {
watcher.close();
this.watchers.delete(empresa);
console.log(`EchoManager: Stopped watching TTS directory for ${empresa}`);
}
}
/**
* Executa o TTS e retorna informações do arquivo
*/
async generateTTS(text, empresa) {
return new Promise((resolve, reject) => {
// Inicia monitoramento da empresa se necessário
this.startWatching(empresa);
console.debug(`EchoManager: Generating TTS for: "${text.substring(0, 50)}..."`);
const process = spawn('bash', [this.piperScript, text, empresa], {
stdio: ['pipe', 'pipe', 'pipe']
});
let stdout = '';
let stderr = '';
process.stdout.on('data', (data) => {
stdout += data.toString();
});
process.stderr.on('data', (data) => {
stderr += data.toString();
});
process.on('close', (code) => {
if (code === 0) {
const outputPath = stdout.trim();
console.debug(`EchoManager: TTS generated: ${outputPath}`);
// Agenda o carregamento do áudio de referência
setTimeout(() => {
this.loadReferenceAudio(outputPath);
}, 100); // Pequeno delay para garantir que o arquivo foi fechado
resolve({
filePath: outputPath,
text: text,
empresa: empresa,
timestamp: Date.now()
});
} else {
console.error(`EchoManager: TTS generation failed with code ${code}`);
console.error(`EchoManager: stderr: ${stderr}`);
reject(new Error(`TTS generation failed: ${stderr}`));
}
});
process.on('error', (error) => {
console.error(`EchoManager: Failed to start TTS process: ${error.message}`);
reject(error);
});
});
}
/**
* Manipula novos arquivos TTS detectados pelo watcher
*/
handleNewTTSFile(filePath, empresa) {
if (this.watchedFiles.has(filePath)) {
return; // Já processado
}
this.watchedFiles.add(filePath);
console.debug(`EchoManager: New TTS file detected: ${filePath}`);
// Aguarda um pouco para garantir que o arquivo foi totalmente escrito
setTimeout(() => {
this.loadReferenceAudio(filePath);
}, 150);
// Limita o cache de arquivos assistidos
if (this.watchedFiles.size > 100) {
const firstFile = this.watchedFiles.values().next().value;
this.watchedFiles.delete(firstFile);
}
}
/**
* Carrega áudio de referência de um arquivo
*/
async loadReferenceAudio(filePath) {
if (!this.speechProvider) {
console.debug("EchoManager: No speech provider associated");
return;
}
try {
if (!fs.existsSync(filePath)) {
console.debug(`EchoManager: Reference file not found: ${filePath}`);
return;
}
const stats = fs.statSync(filePath);
if (stats.size === 0) {
console.debug(`EchoManager: Reference file is empty: ${filePath}`);
return;
}
console.debug(`EchoManager: Loading reference audio: ${filePath} (${stats.size} bytes)`);
// Usa o método do provider para carregar a referência
await this.speechProvider.loadReferenceFromFile(filePath);
this.emit('referenceLoaded', {
filePath: filePath,
size: stats.size,
timestamp: Date.now()
});
} catch (error) {
console.error(`EchoManager: Error loading reference audio from ${filePath}:`, error.message);
}
}
/**
* Limpa referências antigas
*/
clearOldReferences() {
if (this.speechProvider) {
this.speechProvider.clearReferenceBuffer();
console.debug("EchoManager: Cleared old reference buffer");
}
}
/**
* Habilita/desabilita cancelamento de eco
*/
setEchoDetection(enabled) {
if (this.speechProvider) {
this.speechProvider.setEchoDetection(enabled);
console.log(`EchoManager: Echo detection ${enabled ? 'enabled' : 'disabled'}`);
}
}
/**
* Processa texto para TTS e prepara cancelamento de eco
*/
async processTTSWithEchoControl(text, empresa, options = {}) {
try {
// Desabilita temporariamente a detecção durante a geração
if (options.pauseDetection) {
this.setEchoDetection(false);
}
// Gera o TTS
const result = await this.generateTTS(text, empresa);
// Aguarda um pouco antes de reabilitar
if (options.pauseDetection) {
setTimeout(() => {
this.setEchoDetection(true);
}, options.resumeDelay || 500);
}
return result;
} catch (error) {
// Reabilita detecção em caso de erro
if (options.pauseDetection) {
this.setEchoDetection(true);
}
throw error;
}
}
/**
* Monitora diretório TTS e limpa arquivos antigos
*/
startCleanupTask(maxAge = 3600000) { // 1 hora por padrão
setInterval(() => {
this.cleanupOldFiles(maxAge);
}, 300000); // Executa a cada 5 minutos
console.log("EchoManager: Cleanup task started");
}
/**
* Remove arquivos TTS antigos
*/
cleanupOldFiles(maxAge) {
try {
const now = Date.now();
for (const [empresa, watcher] of this.watchers) {
const empresaDir = path.join(this.ttsDirectory, empresa);
if (!fs.existsSync(empresaDir)) continue;
const files = fs.readdirSync(empresaDir);
let deletedCount = 0;
for (const file of files) {
if (!file.endsWith('.wav')) continue;
const filePath = path.join(empresaDir, file);
try {
const stats = fs.statSync(filePath);
const age = now - stats.mtime.getTime();
if (age > maxAge) {
fs.unlinkSync(filePath);
this.watchedFiles.delete(filePath);
deletedCount++;
}
} catch (error) {
// Arquivo pode ter sido removido por outro processo
}
}
if (deletedCount > 0) {
console.debug(`EchoManager: Cleaned up ${deletedCount} old TTS files for ${empresa}`);
}
}
} catch (error) {
console.error("EchoManager: Error during cleanup:", error.message);
}
}
/**
* Obtém estatísticas do cancelamento de eco
*/
getStats() {
return {
watchedCompanies: this.watchers.size,
cachedFiles: this.watchedFiles.size,
referenceBufferSize: this.speechProvider ?
this.speechProvider.referenceAudioBuffer.length : 0,
echoDetectionEnabled: this.speechProvider ?
this.speechProvider.echoDetectionEnabled : false
};
}
/**
* Fecha todos os watchers e limpa recursos
*/
cleanup() {
console.log("EchoManager: Cleaning up...");
for (const [empresa, watcher] of this.watchers) {
watcher.close();
}
this.watchers.clear();
this.watchedFiles.clear();
if (this.speechProvider) {
this.speechProvider.clearReferenceBuffer();
}
console.log("EchoManager: Cleanup completed");
}
}
/**
* Função utilitária para integração fácil
*/
function createEchoManager(speechProvider, options = {}) {
const manager = new EchoManager(options);
manager.setSpeechProvider(speechProvider);
// Inicia limpeza automática
if (options.autoCleanup !== false) {
manager.startCleanupTask(options.cleanupMaxAge);
}
return manager;
}
/**
* Middleware para integração com sistemas existentes
*/
function wrapTTSFunction(originalTTSFunction, echoManager) {
return async function wrappedTTS(text, empresa, ...args) {
try {
// Chama função TTS original
const result = await originalTTSFunction(text, empresa, ...args);
// Se retornou um caminho de arquivo, carrega como referência
if (typeof result === 'string' && result.includes('.wav')) {
setTimeout(() => {
echoManager.loadReferenceAudio(result);
}, 100);
}
return result;
} catch (error) {
console.error("Wrapped TTS function error:", error);
throw error;
}
};
}
module.exports = {
EchoManager,
createEchoManager,
wrapTTSFunction
};
#!/usr/bin/env node
/*
* Copyright 2022 Sangoma Technologies Corporation
* Kevin Harwell <[email protected]>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
const { Codecs } = require("./lib/codecs");
const { Languages } = require("./lib/languages");
const { getProvider } = require("./lib/provider");
const { getServer } = require("./lib/server");
const { dispatch } = require("./lib/dispatcher");
const dotenv = require("dotenv");
const path = require("path");
dotenv.config({ path: '/usr/src/groq/.env' });
const argv = require("yargs/yargs")(process.argv.slice(2))
.command("$0 [options]", "Start a speech to text server with echo cancellation", {
port: {
alias: "p",
desc: "Port to listen on",
default: 9099,
type: "number",
group: "Server",
},
"tts-dir": {
desc: "TTS output directory",
default: "/usr/src/tts-tmp",
type: "string",
group: "Echo Cancellation",
},
"piper-script": {
desc: "Path to Piper TTS script",
default: "./tts_piper.sh",
type: "string",
group: "Echo Cancellation",
},
"echo-detection": {
desc: "Enable echo detection",
default: true,
type: "boolean",
group: "Echo Cancellation",
},
"cleanup-age": {
desc: "Max age for TTS files in milliseconds",
default: 3600000, // 1 hour
type: "number",
group: "Echo Cancellation",
},
"correlation-threshold": {
desc: "Echo correlation threshold (0-1)",
default: 0.7,
type: "number",
group: "Echo Cancellation",
},
"empresa": {
desc: "Default company/empresa for TTS monitoring",
type: "string",
group: "Echo Cancellation",
},
"debug-echo": {
desc: "Enable detailed echo cancellation logging",
default: false,
type: "boolean",
group: "Debug",
}
})
.strict()
.argv;
const codecs = new Codecs(argv);
const languages = new Languages(argv);
const server = getServer("ws", argv);
// Configurações do cancelamento de eco
const echoOptions = {
ttsDirectory: argv["tts-dir"],
piperScript: path.resolve(argv["piper-script"]),
autoCleanup: true,
cleanupMaxAge: argv["cleanup-age"],
correlationThreshold: argv["correlation-threshold"],
debugMode: argv["debug-echo"]
};
console.log('Language being sent to Groq API:', languages.languages[0]);
console.log('GROQ_API_KEY:', process.env.GROQ_API_KEY ? 'Set' : 'Not set');
console.log('Echo Cancellation Config:', {
enabled: argv["echo-detection"],
ttsDirectory: echoOptions.ttsDirectory,
piperScript: echoOptions.piperScript,
correlationThreshold: echoOptions.correlationThreshold,
cleanupAge: `${echoOptions.cleanupMaxAge / 1000}s`
});
// Verificação de dependências
if (!require('fs').existsSync(echoOptions.piperScript)) {
console.warn(`Warning: Piper script not found at ${echoOptions.piperScript}`);
console.warn('Echo cancellation may not work properly');
}
if (!require('fs').existsSync(echoOptions.ttsDirectory)) {
console.log(`Creating TTS directory: ${echoOptions.ttsDirectory}`);
require('fs').mkdirSync(echoOptions.ttsDirectory, { recursive: true });
}
server.on("connection", (client) => {
console.log("New client connected");
// Cria o speech provider
const provider = getProvider("groq", argv);
// Configuração do speech object com opções de eco
const speechConfig = {
codecs: codecs,
languages: languages,
transport: client,
provider: provider,
echoOptions: echoOptions,
empresa: argv.empresa // Empresa padrão se fornecida
};
// Habilita/desabilita detecção de eco baseado nos argumentos
if (!argv["echo-detection"]) {
console.log("Echo detection disabled by command line argument");
// Se o provider já foi criado, desabilita a detecção
setTimeout(() => {
if (provider.setEchoDetection) {
provider.setEchoDetection(false);
}
}, 100);
}
// Event listeners específicos para debug
if (argv["debug-echo"]) {
client.on('message', (data, isBinary) => {
if (!isBinary) {
try {
const msg = JSON.parse(data);
if (msg.request || msg.tts_request) {
console.debug("Debug: Incoming message:", JSON.stringify(msg, null, 2));
}
} catch (e) {
// Ignora erros de parsing para debug
}
}
});
}
// Inicia o dispatch com as configurações
dispatch(speechConfig);
// Log de estatísticas em modo debug
if (argv["debug-echo"]) {
const statsInterval = setInterval(() => {
if (client.readyState === client.OPEN && speechConfig.echoManager) {
const stats = speechConfig.echoManager.getStats();
console.debug("Echo Manager Stats:", JSON.stringify(stats, null, 2));
} else {
clearInterval(statsInterval);
}
}, 15000); // A cada 15 segundos
}
});
// Graceful shutdown
process.on("SIGINT", () => {
console.log("Received SIGINT, shutting down gracefully...");
server.close();
process.exit(0);
});
process.on("SIGTERM", () => {
console.log("Received SIGTERM, shutting down gracefully...");
server.close();
process.exit(0);
});
// Log quando o servidor estiver pronto
server.on('listening', () => {
console.log(`
=== Speech-to-Text Server with Echo Cancellation ===
Port: ${argv.port}
Echo Detection: ${argv["echo-detection"] ? 'Enabled' : 'Disabled'}
TTS Directory: ${echoOptions.ttsDirectory}
Piper Script: ${echoOptions.piperScript}
Correlation Threshold: ${echoOptions.correlationThreshold}
Debug Mode: ${argv["debug-echo"] ? 'Enabled' : 'Disabled'}
Server is ready to accept connections!
===================================================
`);
});
// Tratamento de erros não capturados
process.on('uncaughtException', (error) => {
console.error('Uncaught Exception:', error);
process.exit(1);
});
process.on('unhandledRejection', (reason, promise) => {
console.error('Unhandled Rejection at:', promise, 'reason:', reason);
});
module.exports = {
server,
echoOptions,
codecs,
languages
};
/*
* Copyright 2022 Sangoma Technologies Corporation
* Kevin Harwell <[email protected]>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
const utils = require("./utils");
/**
* Supported languages
*/
const supported = [
'pt-BR',
];
/**
* Converts languages to a comma separated string of language names.
*
* @param {Object|Object[]} objs - Languages to convert
* @return {string} A comma separated string of language names.
*/
function toString(objs) {
if (!Array.isArray(objs)) {
objs = [objs];
}
return objs.join(", ");
}
/** @class Languages. */
class Languages {
/**
* Creates an instance of Languages.
*
* @param {Object} options - Language options
*/
constructor(options) {
this.languages = ['pt-BR'];
this.languages = options.languages ?
utils.intersect(supported, options.languages) : supported;
this.selected = this.languages[0]; // Default to first in list
}
/**
* Selects the first matching language between supported and given languages
*
* @param {Object|Object[]} languages - Languages to intersect with supported
* @return The first selected language
*/
first(languages) {
try {
let res = utils.first(this.languages, languages);
if (res) {
return res;
}
} catch (e) {
}
throw new Error("Language " + toString(languages) + " not supported");
}
};
module.exports = {
Languages,
}
{
"name": "aeap-speech-to-text",
"description": "asterisk external speech to text application",
"version": "0.1.0",
"homepage": "https://github.com/asterisk/aeap-speech-to-text#README.md",
"main": "/usr/src/groq/index.js",
"bin": "/usr/src/groq/index.js",
"license": "Apache-2.0",
"readmeFilename": "README.md",
"repository": {
"type": "git",
"url": "https://github.com/asterisk/aeap-speech-to-text.git"
},
"keywords": [
"asterisk",
"asterisk external application protocol",
"aeap",
"speech"
],
"dependencies": {
"dotenv": "^17.2.0",
"fluent-ffmpeg": "^2.1.3",
"groq-sdk": "^0.3.0",
"node-vad": "^1.1.4",
"wav": "^1.0.2",
"ws": "^8.3.0",
"yargs": "^17.3.1"
}
}
/*
* Copyright 2022 Sangoma Technologies Corporation
* Kevin Harwell <[email protected]>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
const { Writable } = require("stream");
const Groq = require("groq-sdk");
const fs = require("fs");
const path = require("path");
const wav = require("wav");
const VAD = require("node-vad");
const DEFAULT_ENCODING = "MULAW";
const DEFAULT_SAMPLE_RATE = 8000;
const DEFAULT_LANGUAGE = "pt";
const DEFAULT_MODEL = "whisper-large-v3-turbo";
const DEFAULT_MAX_RESULTS = 100;
// VAD parameters
const VAD_MODE = 0;
const VAD_SAMPLE_RATE = 8000;
// Parâmetros de controle
const MIN_AUDIO_DURATION_MS = 500;
const MIN_AUDIO_BUFFER_SIZE = 4000;
const SILENCE_DURATION_MS = 300;
const MIN_TEXT_LENGTH = 2;
const COOLDOWN_PERIOD_MS = 1000;
// Parâmetros para cancelamento de eco
const ECHO_BUFFER_DURATION_MS = 3000; // Mantém 3 segundos de áudio de referência
const CORRELATION_THRESHOLD = 0.7; // Threshold para detectar eco
const ECHO_DECAY_FACTOR = 0.8; // Fator de decay do eco
const SPECTRAL_ANALYSIS_WINDOW = 512; // Janela para análise espectral
/**
* @class GroqProvider.
*
* Start, restart, and stop Groq Whisper speech to text recognition with echo cancellation.
*
* @extends Writable
*/
class GroqProvider extends Writable {
/* Mapped encodings supported by Groq */
static encodings = {
ulaw: "MULAW",
slin16: "LINEAR16",
opus: "OGG_OPUS",
};
/* Languages this provider supports */
static languages = [
"pt",
"pt-BR",
"en",
"es",
"fr",
"de",
"it",
"ja",
"ko",
"zh",
];
/**
* Creates an instance of a Groq provider stream.
*
* @param {Object} [options] - provider specific options
* @param {Object} [options.maxResults=100] - The maximum number of results
* @param {Object} [options.echoReference] - Echo reference audio callback
*/
constructor(options) {
super();
this.config = {
encoding: DEFAULT_ENCODING,
sampleRateHertz: DEFAULT_SAMPLE_RATE,
languageCode: DEFAULT_LANGUAGE,
model: DEFAULT_MODEL,
};
this.maxResults = options && options.maxResults || DEFAULT_MAX_RESULTS;
this.results = [];
this.audioBuffer = Buffer.alloc(0);
this.isProcessing = false;
this.tempFileCounter = 0;
this.vad = new VAD(VAD.Mode.NORMAL);
// Estados para controle de VAD
this.speechDetected = false;
this.speechStartTime = null;
this.silenceStartTime = null;
this.lastProcessTime = 0;
this.consecutiveSilenceFrames = 0;
this.consecutiveVoiceFrames = 0;
// Thresholds para detecção
this.VOICE_FRAMES_THRESHOLD = 2;
this.SILENCE_FRAMES_THRESHOLD = 3;
// Echo Cancellation - Buffers de referência
this.referenceAudioBuffer = Buffer.alloc(0);
this.referenceBufferMaxSize = Math.floor(ECHO_BUFFER_DURATION_MS * this.config.sampleRateHertz / 1000);
this.echoDetectionEnabled = true;
this.lastReferenceTime = 0;
// Análise espectral para detecção de eco
this.fftCache = new Map();
console.log("GroqProvider: Echo cancellation enabled");
}
_construct(callback) {
this.client = new Groq({
apiKey: process.env.GROQ_API_KEY,
});
callback();
}
/**
* Adiciona áudio de referência do TTS para cancelamento de eco
* @param {Buffer} referenceAudio - Buffer de áudio de referência (PCM)
*/
addReferenceAudio(referenceAudio) {
if (!this.echoDetectionEnabled) return;
const now = Date.now();
this.lastReferenceTime = now;
// Converte referência para o mesmo formato do input (MULAW para PCM)
let pcmReference;
if (Buffer.isBuffer(referenceAudio)) {
pcmReference = referenceAudio;
} else {
pcmReference = this.mulawToPcm(referenceAudio);
}
// Adiciona ao buffer de referência
this.referenceAudioBuffer = Buffer.concat([this.referenceAudioBuffer, pcmReference]);
// Limita o tamanho do buffer de referência
if (this.referenceAudioBuffer.length > this.referenceBufferMaxSize * 2) {
const excessBytes = this.referenceAudioBuffer.length - this.referenceBufferMaxSize * 2;
this.referenceAudioBuffer = this.referenceAudioBuffer.slice(excessBytes);
}
console.debug(`GroqProvider: Reference audio added (${pcmReference.length} bytes, total: ${this.referenceAudioBuffer.length})`);
}
/**
* Carrega áudio de referência de um arquivo WAV gerado pelo Piper
* @param {string} filePath - Caminho para o arquivo WAV
*/
async loadReferenceFromFile(filePath) {
try {
if (!fs.existsSync(filePath)) {
console.debug(`GroqProvider: Reference file not found: ${filePath}`);
return;
}
const stats = fs.statSync(filePath);
console.debug(`GroqProvider: Loading reference file: ${filePath} (${stats.size} bytes)`);
// Lê o arquivo WAV
const audioData = fs.readFileSync(filePath);
// Para simplificar, assume que é um WAV de 16-bit mono a 8kHz
// Pula o header WAV (44 bytes) e lê os dados PCM
const headerSize = 44;
if (audioData.length > headerSize) {
const pcmData = audioData.slice(headerSize);
this.addReferenceAudio(pcmData);
console.debug(`GroqProvider: Reference audio loaded from file (${pcmData.length} bytes)`);
}
} catch (error) {
console.error(`GroqProvider: Error loading reference file ${filePath}:`, error.message);
}
}
/**
* Analisa correlação entre áudio de entrada e referência
* @param {Buffer} inputPcm - Áudio de entrada em PCM
* @returns {number} - Correlação (0-1)
*/
calculateCorrelation(inputPcm, referencePcm) {
if (!referencePcm || referencePcm.length < 2 || inputPcm.length < 2) {
return 0;
}
const minLength = Math.min(inputPcm.length, referencePcm.length);
const sampleCount = Math.floor(minLength / 2); // 16-bit samples
if (sampleCount < 100) return 0; // Muito pouco áudio para análise
let correlation = 0;
let inputSum = 0;
let refSum = 0;
let inputSqSum = 0;
let refSqSum = 0;
// Calcula correlação de Pearson
for (let i = 0; i < sampleCount; i++) {
const inputSample = inputPcm.readInt16LE(i * 2);
const refSample = referencePcm.readInt16LE(i * 2);
inputSum += inputSample;
refSum += refSample;
inputSqSum += inputSample * inputSample;
refSqSum += refSample * refSample;
correlation += inputSample * refSample;
}
const numerator = correlation - (inputSum * refSum) / sampleCount;
const denominator = Math.sqrt(
(inputSqSum - (inputSum * inputSum) / sampleCount) *
(refSqSum - (refSum * refSum) / sampleCount)
);
return denominator > 0 ? Math.abs(numerator / denominator) : 0;
}
/**
* Detecta se o áudio de entrada é eco do TTS
* @param {Buffer} inputAudio - Áudio de entrada em PCM
* @returns {boolean} - true se for eco
*/
isEchoAudio(inputAudio) {
if (!this.echoDetectionEnabled || this.referenceAudioBuffer.length < 1000) {
return false;
}
const now = Date.now();
// Se faz muito tempo que não há áudio de referência, não é eco
if (now - this.lastReferenceTime > ECHO_BUFFER_DURATION_MS) {
return false;
}
// Calcula correlação com diferentes delays
const maxDelayMs = 500; // 500ms de delay máximo
const maxDelaySamples = Math.floor(maxDelayMs * this.config.sampleRateHertz / 1000) * 2;
let maxCorrelation = 0;
const step = Math.max(1, Math.floor(maxDelaySamples / 20)); // 20 pontos de teste
for (let delay = 0; delay < Math.min(maxDelaySamples, this.referenceAudioBuffer.length); delay += step) {
const refSlice = this.referenceAudioBuffer.slice(delay, delay + inputAudio.length);
if (refSlice.length < inputAudio.length / 2) continue;
const correlation = this.calculateCorrelation(inputAudio, refSlice);
maxCorrelation = Math.max(maxCorrelation, correlation);
if (maxCorrelation > CORRELATION_THRESHOLD) {
break;
}
}
const isEcho = maxCorrelation > CORRELATION_THRESHOLD;
if (isEcho) {
console.debug(`GroqProvider: Echo detected (correlation: ${maxCorrelation.toFixed(3)})`);
}
return isEcho;
}
_write(chunk, encoding, callback) {
// Adiciona o chunk ao buffer
this.audioBuffer = Buffer.concat([this.audioBuffer, chunk]);
// Converte MULAW para PCM para análise
const pcmChunk = this.mulawToPcm(chunk);
// Verifica se é eco antes de processar com VAD
if (this.isEchoAudio(pcmChunk)) {
console.debug("GroqProvider: Skipping echo audio");
// Remove o chunk do buffer se for eco
this.audioBuffer = this.audioBuffer.slice(chunk.length);
callback();
return;
}
// Processa normalmente com VAD se não for eco
this.vad.processAudio(pcmChunk, VAD_SAMPLE_RATE)
.then((res) => {
this.handleVADResult(res);
callback();
})
.catch((err) => {
console.error("VAD Processing Error:", err);
callback(err);
});
}
_writev(chunks, callback) {
// Concatena todos os chunks primeiro
const allChunks = chunks.map(c => c.chunk);
const combinedChunk = Buffer.concat(allChunks);
// Converte para análise
const pcmChunk = this.mulawToPcm(combinedChunk);
// Verifica se é eco
if (this.isEchoAudio(pcmChunk)) {
console.debug("GroqProvider: Skipping echo audio (batch)");
callback();
return;
}
// Adiciona ao buffer e processa normalmente
this.audioBuffer = Buffer.concat([this.audioBuffer, combinedChunk]);
this.vad.processAudio(pcmChunk, VAD_SAMPLE_RATE)
.then((res) => {
this.handleVADResult(res);
callback();
})
.catch((err) => {
console.error("VAD Processing Error:", err);
callback(err);
});
}
/**
* Processa o resultado do VAD
*/
handleVADResult(vadResult) {
const now = Date.now();
switch (vadResult) {
case VAD.Event.VOICE:
this.consecutiveVoiceFrames++;
this.consecutiveSilenceFrames = 0;
if (this.consecutiveVoiceFrames >= this.VOICE_FRAMES_THRESHOLD && !this.speechDetected) {
console.debug("GroqProvider: Speech detected");
this.speechDetected = true;
this.speechStartTime = now;
this.silenceStartTime = null;
}
break;
case VAD.Event.SILENCE:
this.consecutiveSilenceFrames++;
this.consecutiveVoiceFrames = 0;
if (this.consecutiveSilenceFrames >= this.SILENCE_FRAMES_THRESHOLD) {
if (this.speechDetected && !this.silenceStartTime) {
console.debug("GroqProvider: Silence detected after speech");
this.silenceStartTime = now;
}
if (this.speechDetected && this.silenceStartTime &&
(now - this.silenceStartTime) >= SILENCE_DURATION_MS) {
this.maybeProcessAudioChunk();
}
}
break;
case VAD.Event.ERROR:
console.error("VAD Error");
break;
}
// Log periódico do estado
if (now % 2000 < 100) {
console.debug(`GroqProvider: State - Voice: ${this.consecutiveVoiceFrames}, Silence: ${this.consecutiveSilenceFrames}, Speech: ${this.speechDetected}, Buffer: ${this.audioBuffer.length}, Ref: ${this.referenceAudioBuffer.length}`);
}
}
_final(callback) {
if (this.audioBuffer.length > 0 && this.speechDetected) {
this.maybeProcessAudioChunk();
}
callback();
}
/**
* Verifica se deve processar o chunk de áudio
*/
maybeProcessAudioChunk() {
const now = Date.now();
if (now - this.lastProcessTime < COOLDOWN_PERIOD_MS) {
console.debug("GroqProvider: Still in cooldown period");
return;
}
if (this.isProcessing) {
console.debug("GroqProvider: Already processing");
return;
}
if (this.audioBuffer.length < MIN_AUDIO_BUFFER_SIZE) {
console.debug(`GroqProvider: Buffer too small (${this.audioBuffer.length} < ${MIN_AUDIO_BUFFER_SIZE})`);
return;
}
if (this.speechStartTime && (now - this.speechStartTime) < MIN_AUDIO_DURATION_MS) {
console.debug(`GroqProvider: Speech too short (${now - this.speechStartTime}ms < ${MIN_AUDIO_DURATION_MS}ms)`);
return;
}
// Análise final de eco no buffer completo
const pcmBuffer = this.mulawToPcm(this.audioBuffer);
if (this.isEchoAudio(pcmBuffer)) {
console.debug("GroqProvider: Final echo check - skipping audio");
this.resetState();
return;
}
console.debug(`GroqProvider: Processing audio chunk (${this.audioBuffer.length} bytes, ${now - this.speechStartTime}ms duration)`);
this.processAudioChunk();
}
/**
* Processa um chunk de áudio acumulado
*/
async processAudioChunk() {
if (this.isProcessing || this.audioBuffer.length === 0) {
return;
}
this.isProcessing = true;
const processStartTime = Date.now();
try {
// Cria um arquivo temporário WAV
const tempFileName = `temp_audio_${this.tempFileCounter++}_${Date.now()}.wav`;
const tempFilePath = path.join("/tmp", tempFileName);
// Converte o buffer de áudio para WAV
await this.convertToWav(this.audioBuffer, tempFilePath);
console.debug(`GroqProvider: Sending ${this.audioBuffer.length} bytes to Groq`);
// Envia para o Groq Whisper
const transcription = await this.client.audio.transcriptions.create({
file: fs.createReadStream(tempFilePath),
model: this.config.model,
language: this.config.languageCode,
response_format: "json",
temperature: 0.0,
});
// Limpa o arquivo temporário
fs.unlinkSync(tempFilePath);
// Verifica se o texto é válido
if (transcription.text && transcription.text.trim() &&
transcription.text.trim().length >= MIN_TEXT_LENGTH &&
!this.isRepetitiveText(transcription.text.trim()) &&
!this.isTTSText(transcription.text.trim())) {
const result = {
text: transcription.text.trim(),
score: 95,
};
console.debug("GroqProvider: Valid result: " + JSON.stringify(result));
this.emit("result", result);
if (this.results.length === this.maxResults) {
this.results.shift();
}
this.results.push(result);
} else {
console.debug(`GroqProvider: Ignoring invalid/TTS text: "${transcription.text}"`);
}
} catch (error) {
console.error("GroqProvider: Error processing audio chunk:", error);
} finally {
this.resetState();
this.lastProcessTime = Date.now();
this.isProcessing = false;
console.debug(`GroqProvider: Processing completed in ${Date.now() - processStartTime}ms`);
}
}
/**
* Reset do estado após processamento
*/
resetState() {
this.audioBuffer = Buffer.alloc(0);
this.speechDetected = false;
this.speechStartTime = null;
this.silenceStartTime = null;
this.consecutiveSilenceFrames = 0;
this.consecutiveVoiceFrames = 0;
}
/**
* Verifica se o texto parece ser do TTS (características específicas)
*/
isTTSText(text) {
const lowerText = text.toLowerCase().trim();
// Padrões comuns de TTS em português
const ttsPatterns = [
/^(olá|oi|bem.?vindo)/,
/^(por favor|obrigad[oa])/,
/como posso ajud.*/,
/^(aguarde|um momento)/,
/^(desculpe|perdão)/,
];
return ttsPatterns.some(pattern => pattern.test(lowerText));
}
/**
* Verifica se o texto é repetitivo
*/
isRepetitiveText(text) {
const commonNoiseTexts = [
"e aí",
"aí",
"ah",
"oh",
"hm",
"hmm"
];
const lowerText = text.toLowerCase().trim();
if (commonNoiseTexts.includes(lowerText)) {
console.debug(`GroqProvider: Detected noise text: "${lowerText}"`);
return true;
}
if (this.results.length >= 1) {
const lastText = this.results[this.results.length - 1].text.toLowerCase().trim();
if (lastText === lowerText) {
console.debug(`GroqProvider: Detected repetitive text: "${lowerText}"`);
return true;
}
}
return false;
}
/**
* Converte buffer de áudio para arquivo WAV
*/
async convertToWav(audioBuffer, outputPath) {
return new Promise((resolve, reject) => {
const writer = new wav.FileWriter(outputPath, {
channels: 1,
sampleRate: this.config.sampleRateHertz,
bitDepth: 16,
});
writer.on("error", reject);
writer.on("done", resolve);
let pcmBuffer;
if (this.config.encoding === "MULAW") {
pcmBuffer = this.mulawToPcm(audioBuffer);
} else {
pcmBuffer = audioBuffer;
}
writer.write(pcmBuffer);
writer.end();
});
}
/**
* Converte MULAW para PCM
*/
mulawToPcm(mulawBuffer) {
const pcmBuffer = Buffer.alloc(mulawBuffer.length * 2);
for (let i = 0; i < mulawBuffer.length; i++) {
const mulaw = mulawBuffer[i];
const pcm = this.mulawToPcmSample(mulaw);
pcmBuffer.writeInt16LE(pcm, i * 2);
}
return pcmBuffer;
}
/**
* Converte uma amostra MULAW para PCM
*/
mulawToPcmSample(mulaw) {
const BIAS = 0x84;
const CLIP = 32635;
mulaw = ~mulaw;
const sign = (mulaw & 0x80) ? -1 : 1;
const exponent = (mulaw >> 4) & 0x07;
const mantissa = mulaw & 0x0F;
let sample = mantissa << (exponent + 3);
sample += BIAS << exponent;
sample = sign * (sample - BIAS);
return Math.max(-CLIP, Math.min(CLIP, sample));
}
/**
* Sets the configuration to use on the recognition stream.
*/
setConfig(config) {
if (!config) {
return;
}
let update = {};
if (config.codec) {
if (!(config.codec.name in GroqProvider.encodings)) {
throw new Error("Codec '" + config.codec.name + "' not supported");
}
update.encoding = GroqProvider.encodings[config.codec.name];
update.sampleRateHertz = config.codec.sampleRate;
}
if (config.language) {
const languageMap = {
'pt-BR': 'pt',
'en-US': 'en',
'es-ES': 'es',
};
const mappedLanguage = languageMap[config.language] || config.language;
if (!GroqProvider.languages.includes(mappedLanguage)) {
throw new Error("Language '" + config.language + "' not supported");
}
update.languageCode = mappedLanguage;
}
this.config = {...this.config, ...update};
}
/**
* Starts the recognition stream.
*/
start(config) {
this.setConfig(config);
console.log("GroqProvider: Started with config:", this.config);
this.resetState();
this.lastProcessTime = 0;
while (this.writableCorked) {
this.uncork();
}
}
/**
* Stops the recognition stream.
*/
stop() {
if (this.audioBuffer.length >= MIN_AUDIO_BUFFER_SIZE && this.speechDetected) {
this.maybeProcessAudioChunk();
}
this.cork();
}
/**
* Restarts the recognition stream.
*/
restart(config) {
this.stop();
this.start(config);
}
/**
* Habilita/desabilita detecção de eco
*/
setEchoDetection(enabled) {
this.echoDetectionEnabled = enabled;
console.log(`GroqProvider: Echo detection ${enabled ? 'enabled' : 'disabled'}`);
}
/**
* Limpa o buffer de referência
*/
clearReferenceBuffer() {
this.referenceAudioBuffer = Buffer.alloc(0);
console.debug("GroqProvider: Reference buffer cleared");
}
}
/**
* Gets a speech provider
*/
function getProvider(name, options) {
if (name === "groq") {
return new GroqProvider(options);
}
throw new Error("Unsupported speech provider '" + name + "'");
}
module.exports = {
getProvider,
}
/*
* Copyright 2022 Sangoma Technologies Corporation
* Kevin Harwell <[email protected]>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
const EventEmitter = require("events");
const { WebSocketServer } = require("ws");
/*
* For server accepting clients implementer.
*
* Basic server public interface:
*
* function close() - shutdowns the server
* event connection(client) - triggered when a client connects
*
* Basic client public interface:
*
* function send(data, { binary: <boolean> }) - sends data to client
* event close() - triggered when a client closes
* event message(data, isBinary) - triggered when data is received
*/
const DEFAULT_PORT = 9099;
/**
* @class WSServer.
*
* Wrapper around a websocket server. Starts listening on a given port, and
* emits a "connection" event when a client connects.
*
* @extends EventEmitter
*/
class WSServer extends EventEmitter {
/**
* Creates an instance of a Websocket server.
*
* @param {Object} [options] - websocket server specific options
* @param {Object} [options.port=9099] - Port to listen on
*/
constructor(options) {
super();
this.port = options && options.port || DEFAULT_PORT;
this.ws = new WebSocketServer({
port: this.port,
clientTracking: true,
});
this.ws.on("listening", () => {
console.info("Server on port '" + this.port + "': started listening");
});
this.ws.on("close", () => {
console.info("Server on port '" + this.port + "': stopped listening");
});
this.ws.on("error", (error) => {
console.error(error);
});
this.ws.on("connection", (client) => {
console.info("Server on port '" + this.port + "': client connected");
/**
* Client connect event.
*
* @event WSServer#connection
* @type {object}
*/
this.emit("connection", client);
});
}
/**
* Close/Stop the server disconnecting all clients
*/
close() {
for (let client of this.ws.clients) {
console.log("WSServer: close client");
client.close();
}
this.ws.close((error) => {
console.log("error " + error);
});
}
}
/**
* Gets a server.
*
* @param {string} name - A server type name
* @param {Object} options - Server specific options
* @return A server.
*/
function getServer(name, options) {
if (name == "ws") {
return new WSServer(options);
}
throw new Error("Unsupported server type '" + name + "'");
}
module.exports = {
getServer,
}
/*
* Copyright 2022 Sangoma Technologies Corporation
* Kevin Harwell <[email protected]>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Checks to see if given objects are equal.
*
* @param {Object} obj1
* @param {Object} obj2
* @param {requestCallback} [eq] - Object equality callback
* @return {string} A comma separated string of language names.
*/
function equals(obj1, obj2, eq) {
if (obj1 == undefined || obj1 == null ||
obj2 == undefined || obj2 == null) {
return false;
}
return obj1 == obj2 || (eq && eq(obj1, obj2));
}
/**
* Creates a list of objects that contains only those objects that are equal between
* the given lists of objects.
*
* @param {Object[]} objs1
* @param {Object[]} objs2
* @param {requestCallback} [eq] - Object equality callback
* @return {Object[]} An intersected list of objects.
*/
function intersect(objs1, objs2, eq) {
let res = [];
for (let obj1 of objs1) {
for (let obj2 of objs2) {
if (equals(obj1, obj2, eq)) {
res.push(obj1);
}
}
}
return res;
}
/**
* Finds and returns the first matching object within two given lists.
*
* @param {Object[]} objs1
* @param {Object[]} objs2
* @param {requestCallback} [eq] - Object equality callback
* @return {Object} The first matching object.
*/
function first(objs1, objs2, eq) {
if (!Array.isArray(objs1)) {
objs1 = [objs1];
}
if (!Array.isArray(objs2)) {
objs2 = [objs2];
}
return intersect(objs1, objs2, eq)[0];
}
module.exports = {
first,
intersect,
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment