Created
March 20, 2026 22:33
-
-
Save Nikkely/408ac91bc82d64bbdeb2cd3c22d70384 to your computer and use it in GitHub Desktop.
twilio-emultator-client.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """Local audio client that speaks Twilio Media Streams protocol over WebSocket.""" | |
| import argparse | |
| import asyncio | |
| import base64 | |
| import json | |
| import signal | |
| import sys | |
| import threading | |
| import time | |
| import uuid | |
| try: | |
| import audioop | |
| except ImportError: | |
| import audioop_lts as audioop | |
| import pyaudio | |
| import websockets | |
| STREAM_SID = f"LOCAL{uuid.uuid4().hex[:27]}" | |
| ACCOUNT_SID = "AC0000000000000000000000000000local" | |
| CALL_SID = "CA0000000000000000000000000000local" | |
| RATE = 8000 | |
| CHANNELS = 1 | |
| CHUNK = 160 # 20ms at 8kHz | |
| def mic_capture(audio_queue: asyncio.Queue, loop: asyncio.AbstractEventLoop, stop_event: threading.Event): | |
| pa = pyaudio.PyAudio() | |
| stream = pa.open(format=pyaudio.paInt16, channels=CHANNELS, rate=RATE, | |
| input=True, frames_per_buffer=CHUNK) | |
| try: | |
| while not stop_event.is_set(): | |
| pcm = stream.read(CHUNK, exception_on_overflow=False) | |
| mulaw = audioop.lin2ulaw(pcm, 2) | |
| payload = base64.b64encode(mulaw).decode("ascii") | |
| loop.call_soon_threadsafe(audio_queue.put_nowait, payload) | |
| finally: | |
| stream.stop_stream() | |
| stream.close() | |
| pa.terminate() | |
| def speaker_playback(playback_queue: asyncio.Queue, loop: asyncio.AbstractEventLoop, stop_event: threading.Event): | |
| pa = pyaudio.PyAudio() | |
| stream = pa.open(format=pyaudio.paInt16, channels=CHANNELS, rate=RATE, | |
| output=True, frames_per_buffer=CHUNK) | |
| try: | |
| while not stop_event.is_set(): | |
| try: | |
| payload = playback_queue.get_nowait() | |
| except asyncio.QueueEmpty: | |
| time.sleep(0.005) | |
| continue | |
| mulaw = base64.b64decode(payload) | |
| pcm = audioop.ulaw2lin(mulaw, 2) | |
| stream.write(pcm) | |
| finally: | |
| stream.stop_stream() | |
| stream.close() | |
| pa.terminate() | |
| def flush_queue(q: asyncio.Queue): | |
| while not q.empty(): | |
| try: | |
| q.get_nowait() | |
| except asyncio.QueueEmpty: | |
| break | |
| async def run(host: str, port: int, token: str | None = None): | |
| path = "/twilio/media-stream" | |
| if token: | |
| path += f"?token={token}" | |
| uri = f"ws://{host}:{port}{path}" | |
| print(f"[AudioClient] Connecting to {uri} ...") | |
| loop = asyncio.get_event_loop() | |
| audio_queue: asyncio.Queue = asyncio.Queue() | |
| playback_queue: asyncio.Queue = asyncio.Queue() | |
| stop_event = threading.Event() | |
| async with websockets.connect(uri) as ws: | |
| print("[AudioClient] Connected!") | |
| mic_thread = threading.Thread(target=mic_capture, args=(audio_queue, loop, stop_event), daemon=True) | |
| speaker_thread = threading.Thread(target=speaker_playback, args=(playback_queue, loop, stop_event), daemon=True) | |
| mic_thread.start() | |
| speaker_thread.start() | |
| # connected | |
| await ws.send(json.dumps({ | |
| "event": "connected", | |
| "protocol": "Call", | |
| "version": "1.0.0", | |
| })) | |
| print("[AudioClient] → connected") | |
| # start | |
| await ws.send(json.dumps({ | |
| "event": "start", | |
| "sequenceNumber": "1", | |
| "streamSid": STREAM_SID, | |
| "start": { | |
| "streamSid": STREAM_SID, | |
| "accountSid": ACCOUNT_SID, | |
| "callSid": CALL_SID, | |
| "tracks": ["inbound"], | |
| "mediaFormat": { | |
| "encoding": "audio/x-mulaw", | |
| "sampleRate": RATE, | |
| "channels": CHANNELS, | |
| }, | |
| }, | |
| })) | |
| print("[AudioClient] → start") | |
| seq = 2 | |
| chunk_num = 1 | |
| shutdown = asyncio.Event() | |
| def request_shutdown(): | |
| stop_event.set() | |
| shutdown.set() | |
| loop.add_signal_handler(signal.SIGINT, request_shutdown) | |
| async def send_media(): | |
| nonlocal seq, chunk_num | |
| while not shutdown.is_set(): | |
| try: | |
| payload = await asyncio.wait_for(audio_queue.get(), timeout=0.1) | |
| except asyncio.TimeoutError: | |
| continue | |
| timestamp = str(chunk_num * 20) | |
| await ws.send(json.dumps({ | |
| "event": "media", | |
| "sequenceNumber": str(seq), | |
| "streamSid": STREAM_SID, | |
| "media": { | |
| "track": "inbound", | |
| "chunk": str(chunk_num), | |
| "timestamp": timestamp, | |
| "payload": payload, | |
| }, | |
| })) | |
| seq += 1 | |
| chunk_num += 1 | |
| async def receive_events(): | |
| try: | |
| async for message in ws: | |
| data = json.loads(message) | |
| event = data.get("event") | |
| if event == "media": | |
| media_payload = data.get("media", {}).get("payload") | |
| if media_payload: | |
| playback_queue.put_nowait(media_payload) | |
| elif event == "clear": | |
| flush_queue(playback_queue) | |
| print("[AudioClient] ← clear (flushed playback)") | |
| else: | |
| print(f"[AudioClient] ← {event}") | |
| except websockets.ConnectionClosed: | |
| request_shutdown() | |
| sender = asyncio.create_task(send_media()) | |
| receiver = asyncio.create_task(receive_events()) | |
| await shutdown.wait() | |
| print("\n[AudioClient] Shutting down ...") | |
| # stop | |
| await ws.send(json.dumps({ | |
| "event": "stop", | |
| "sequenceNumber": str(seq), | |
| "streamSid": STREAM_SID, | |
| "stop": { | |
| "accountSid": ACCOUNT_SID, | |
| "callSid": CALL_SID, | |
| }, | |
| })) | |
| print("[AudioClient] → stop") | |
| sender.cancel() | |
| receiver.cancel() | |
| try: | |
| await sender | |
| except asyncio.CancelledError: | |
| pass | |
| try: | |
| await receiver | |
| except asyncio.CancelledError: | |
| pass | |
| stop_event.set() | |
| print("[AudioClient] Done.") | |
| def main(): | |
| parser = argparse.ArgumentParser(description="Local audio client for Twilio Media Streams protocol") | |
| parser.add_argument("--host", default="localhost") | |
| parser.add_argument("--port", type=int, default=8080) | |
| args = parser.parse_args() | |
| asyncio.run(run(args.host, args.port)) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment