Created
January 29, 2024 15:22
-
-
Save c0bra/bacadd4ed1de2d18d58307458f403d66 to your computer and use it in GitHub Desktop.
Attempt: Twilio->Deepgram live streaming transcription in Node.js
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import fs from 'fs' | |
import Stream from 'node:stream' | |
import WaveFile from 'wavefile'; | |
import { WebSocketServer } from 'ws'; | |
import { createClient, LiveTranscriptionEvents } from '@deepgram/sdk'; | |
const deepgram = createClient(process.env.DEEEPGRAM_API_KEY); | |
process.on('unhandledRejection', (reason, p) => { | |
console.error(reason, 'Unhandled Rejection at Promise', p); | |
}); | |
let transformStream | |
websocket_server(); | |
function websocket_server() { | |
const wss = new WebSocketServer({ | |
port: 6000, | |
}); | |
wss.on('connection', async function connection(ws, req) { | |
if (req.url !== '/twilio') { | |
console.log("Invalid URL, must be /twilio") | |
ws.send({ error: "Invalid URL, must be /twilio" }) | |
ws.close(); | |
return; | |
} | |
transformStream = await deepgram_connect() | |
console.log("New connection from ", req.socket.remoteAddress) | |
// twilio sends audio data as 160 byte messages containing 20ms of audio each | |
// we will buffer 20 twilio messages corresponding to 0.4 seconds of audio to improve throughput performance | |
const BUFFERED_MESSAGES = 20 | |
const BUFFER_SIZE = BUFFERED_MESSAGES * 160 | |
let inbuffer = Buffer.alloc(0) | |
let outbuffer = Buffer.alloc(0) | |
let inbound_chunks_started = false | |
let outbound_chunks_started = false | |
let latest_inbound_timestamp = 0 | |
let latest_outbound_timestamp = 0 | |
let totalInFile = new Float32Array(0); | |
let totalOutFile = new Float32Array(0); | |
ws.on('close', () => { | |
console.log("Twilio websocket connection closed") | |
const allFile = new WaveFile.WaveFile(); | |
allFile.fromScratch(2, 8000, '16', [totalInFile, totalOutFile]) | |
fs.writeFileSync('all.wav', allFile.toBuffer(), 'binary') | |
}); | |
ws.on('message', (msg) => { | |
const data = JSON.parse(Buffer.from(msg).toString()); | |
if (data.event === 'start') { | |
const start = data.start | |
const callsid = start.callSid | |
} else if (data.event === 'connected') { | |
return; | |
} else if (data.event === 'media') { | |
const media = data.media | |
const chunk = Buffer.from(media.payload, "base64"); | |
if (media.track === 'inbound') { | |
// fills in silence if there have been dropped packets | |
if (inbound_chunks_started) { | |
if (latest_inbound_timestamp + BUFFERED_MESSAGES < parseInt(media.timestamp, 10)) { | |
const bytes_to_fill = 8 * (parseInt(media.timestamp, 10) - (latest_inbound_timestamp + BUFFERED_MESSAGES)) | |
// NOTE: 0xff is silence for mulaw audio | |
// and there are 8 bytes per ms of data for our format (8 bit, 8000 Hz) | |
inbuffer = Buffer.concat([inbuffer, Buffer.alloc(bytes_to_fill).fill(0xff)]) | |
} | |
} else { | |
inbound_chunks_started = true | |
latest_inbound_timestamp = parseInt(media.timestamp, 10) | |
// this basically sets the starting point for outbound timestamps | |
latest_outbound_timestamp = latest_inbound_timestamp - BUFFERED_MESSAGES | |
} | |
latest_inbound_timestamp = parseInt(media.timestamp, 10) | |
inbuffer = Buffer.concat([inbuffer, chunk]) | |
} else if (media.track === 'outbound') { | |
outbound_chunks_started = true | |
if (latest_outbound_timestamp + BUFFERED_MESSAGES < parseInt(media.timestamp, 10)) { | |
const bytes_to_fill = 8 * (parseInt(media.timestamp, 10) - (latest_outbound_timestamp + BUFFERED_MESSAGES)) | |
// NOTE: 0xff is silence for mulaw audio | |
// and there are 8 bytes per ms of data for our format (8 bit, 8000 Hz) | |
outbuffer = Buffer.concat([outbuffer, Buffer.alloc(bytes_to_fill).fill(0xff)]) | |
} | |
latest_outbound_timestamp = parseInt(media.timestamp) | |
outbuffer = Buffer.concat([outbuffer, chunk]) | |
} | |
} else if (data.stop) { | |
// ws.close() | |
} | |
while (inbuffer.length >= BUFFER_SIZE && outbuffer.length >= BUFFER_SIZE) { | |
const wavIn = new WaveFile.WaveFile(); | |
wavIn.fromScratch(1, 8000, '8m', inbuffer.subarray(0, BUFFER_SIZE)) | |
wavIn.fromMuLaw() | |
const wavOut = new WaveFile.WaveFile(); | |
wavOut.fromScratch(1, 8000, '8m', outbuffer.subarray(0, BUFFER_SIZE)) | |
wavOut.fromMuLaw() | |
const mixed = new WaveFile.WaveFile(); | |
mixed.fromScratch(2, 8000, '16', [wavIn.getSamples(true, Float32Array), wavOut.getSamples(true, Float32Array)]) | |
transformStream.write(mixed.toBuffer()) | |
// clearing buffers | |
inbuffer = inbuffer.subarray(BUFFER_SIZE) | |
outbuffer = outbuffer.subarray(BUFFER_SIZE) | |
} | |
}) | |
ws.on('error', console.error); | |
}); | |
// server.listen(6000); | |
console.log("Websocket server listening on port 6000\n") | |
return wss; | |
} | |
async function deepgram_connect() { | |
console.log("Connecting to Deepgram...") | |
const connection = deepgram.listen.live({ | |
sample_rate: 8000, | |
channels: 2, | |
multichannel: true, | |
// encoding: 'mulaw', | |
}); | |
const keepAlive = setInterval(() => { | |
console.log("deepgram: keepalive"); | |
connection.keepAlive(); | |
}, 10 * 1000); | |
return new Promise((resolve, reject) => { | |
connection.on(LiveTranscriptionEvents.Open, () => { | |
connection.on(LiveTranscriptionEvents.Close, (evt) => { | |
console.log(`Deepgram connection closed: [${evt.code}] ${evt.reason}`); | |
clearInterval(keepAlive); | |
// connection.finish(); | |
}); | |
// connection.on(LiveTranscriptionEvents.Metadata, (data) => { | |
// console.log('Metadata:', data); | |
// }); | |
connection.on(LiveTranscriptionEvents.Transcript, (data) => { | |
console.log('Transcript:', data.channel.alternatives); | |
}); | |
connection.on(LiveTranscriptionEvents.Error, (error) => { | |
console.error(error); | |
}) | |
connection.on(LiveTranscriptionEvents.Warning, (error) => { | |
console.error(error); | |
}) | |
// transformStream = new Stream.PassThrough(); | |
const chunkHandler = (chunk, encoding, cb) => { | |
if (!chunk) return; | |
if (connection.getReadyState() === 1) { | |
connection.send(chunk) | |
connection.send(buf) | |
} else if (connection.getReadyState() >= 2 /* 2 = CLOSING, 3 = CLOSED */) { | |
console.log("socket: data couldn't be sent to deepgram"); | |
console.log("socket: retrying connection to deepgram"); | |
/* Attempt to reopen the Deepgram connection */ | |
connection.removeAllListeners(); | |
deepgram_connect() | |
.then((stream) => { | |
transformStream = stream; | |
transformStream.on('data', chunkHandler) | |
}) | |
} else { | |
console.log("socket: data couldn't be sent to deepgram"); | |
} | |
if (cb) cb(null); | |
} | |
// I've tried both transform and pass-through streams, but both have the same issue | |
transformStream = new Stream.Transform({ | |
transform: chunkHandler, | |
}); | |
transformStream.on('end', () => { | |
console.log("transform-stream: end"); | |
}) | |
resolve(transformStream); | |
}); | |
}); | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment