Skip to content

Instantly share code, notes, and snippets.

@Nikkely
Created March 20, 2026 22:33
Show Gist options
  • Select an option

  • Save Nikkely/408ac91bc82d64bbdeb2cd3c22d70384 to your computer and use it in GitHub Desktop.

Select an option

Save Nikkely/408ac91bc82d64bbdeb2cd3c22d70384 to your computer and use it in GitHub Desktop.
twilio-emultator-client.py
#!/usr/bin/env python3
"""Local audio client that speaks Twilio Media Streams protocol over WebSocket."""
import argparse
import asyncio
import base64
import json
import signal
import sys
import threading
import time
import uuid
try:
import audioop
except ImportError:
import audioop_lts as audioop
import pyaudio
import websockets
STREAM_SID = f"LOCAL{uuid.uuid4().hex[:27]}"
ACCOUNT_SID = "AC0000000000000000000000000000local"
CALL_SID = "CA0000000000000000000000000000local"
RATE = 8000
CHANNELS = 1
CHUNK = 160 # 20ms at 8kHz
def mic_capture(audio_queue: asyncio.Queue, loop: asyncio.AbstractEventLoop, stop_event: threading.Event):
pa = pyaudio.PyAudio()
stream = pa.open(format=pyaudio.paInt16, channels=CHANNELS, rate=RATE,
input=True, frames_per_buffer=CHUNK)
try:
while not stop_event.is_set():
pcm = stream.read(CHUNK, exception_on_overflow=False)
mulaw = audioop.lin2ulaw(pcm, 2)
payload = base64.b64encode(mulaw).decode("ascii")
loop.call_soon_threadsafe(audio_queue.put_nowait, payload)
finally:
stream.stop_stream()
stream.close()
pa.terminate()
def speaker_playback(playback_queue: asyncio.Queue, loop: asyncio.AbstractEventLoop, stop_event: threading.Event):
pa = pyaudio.PyAudio()
stream = pa.open(format=pyaudio.paInt16, channels=CHANNELS, rate=RATE,
output=True, frames_per_buffer=CHUNK)
try:
while not stop_event.is_set():
try:
payload = playback_queue.get_nowait()
except asyncio.QueueEmpty:
time.sleep(0.005)
continue
mulaw = base64.b64decode(payload)
pcm = audioop.ulaw2lin(mulaw, 2)
stream.write(pcm)
finally:
stream.stop_stream()
stream.close()
pa.terminate()
def flush_queue(q: asyncio.Queue):
while not q.empty():
try:
q.get_nowait()
except asyncio.QueueEmpty:
break
async def run(host: str, port: int, token: str | None = None):
path = "/twilio/media-stream"
if token:
path += f"?token={token}"
uri = f"ws://{host}:{port}{path}"
print(f"[AudioClient] Connecting to {uri} ...")
loop = asyncio.get_event_loop()
audio_queue: asyncio.Queue = asyncio.Queue()
playback_queue: asyncio.Queue = asyncio.Queue()
stop_event = threading.Event()
async with websockets.connect(uri) as ws:
print("[AudioClient] Connected!")
mic_thread = threading.Thread(target=mic_capture, args=(audio_queue, loop, stop_event), daemon=True)
speaker_thread = threading.Thread(target=speaker_playback, args=(playback_queue, loop, stop_event), daemon=True)
mic_thread.start()
speaker_thread.start()
# connected
await ws.send(json.dumps({
"event": "connected",
"protocol": "Call",
"version": "1.0.0",
}))
print("[AudioClient] → connected")
# start
await ws.send(json.dumps({
"event": "start",
"sequenceNumber": "1",
"streamSid": STREAM_SID,
"start": {
"streamSid": STREAM_SID,
"accountSid": ACCOUNT_SID,
"callSid": CALL_SID,
"tracks": ["inbound"],
"mediaFormat": {
"encoding": "audio/x-mulaw",
"sampleRate": RATE,
"channels": CHANNELS,
},
},
}))
print("[AudioClient] → start")
seq = 2
chunk_num = 1
shutdown = asyncio.Event()
def request_shutdown():
stop_event.set()
shutdown.set()
loop.add_signal_handler(signal.SIGINT, request_shutdown)
async def send_media():
nonlocal seq, chunk_num
while not shutdown.is_set():
try:
payload = await asyncio.wait_for(audio_queue.get(), timeout=0.1)
except asyncio.TimeoutError:
continue
timestamp = str(chunk_num * 20)
await ws.send(json.dumps({
"event": "media",
"sequenceNumber": str(seq),
"streamSid": STREAM_SID,
"media": {
"track": "inbound",
"chunk": str(chunk_num),
"timestamp": timestamp,
"payload": payload,
},
}))
seq += 1
chunk_num += 1
async def receive_events():
try:
async for message in ws:
data = json.loads(message)
event = data.get("event")
if event == "media":
media_payload = data.get("media", {}).get("payload")
if media_payload:
playback_queue.put_nowait(media_payload)
elif event == "clear":
flush_queue(playback_queue)
print("[AudioClient] ← clear (flushed playback)")
else:
print(f"[AudioClient] ← {event}")
except websockets.ConnectionClosed:
request_shutdown()
sender = asyncio.create_task(send_media())
receiver = asyncio.create_task(receive_events())
await shutdown.wait()
print("\n[AudioClient] Shutting down ...")
# stop
await ws.send(json.dumps({
"event": "stop",
"sequenceNumber": str(seq),
"streamSid": STREAM_SID,
"stop": {
"accountSid": ACCOUNT_SID,
"callSid": CALL_SID,
},
}))
print("[AudioClient] → stop")
sender.cancel()
receiver.cancel()
try:
await sender
except asyncio.CancelledError:
pass
try:
await receiver
except asyncio.CancelledError:
pass
stop_event.set()
print("[AudioClient] Done.")
def main():
parser = argparse.ArgumentParser(description="Local audio client for Twilio Media Streams protocol")
parser.add_argument("--host", default="localhost")
parser.add_argument("--port", type=int, default=8080)
args = parser.parse_args()
asyncio.run(run(args.host, args.port))
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment