Last active
October 29, 2025 13:30
-
-
Save pkdavies/d552b02032c16200dad6dac175f7b86b to your computer and use it in GitHub Desktop.
Full-duplex GGWave command loop
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Simple full-duplex GGWave command loop. | |
| - Listens continuously for incoming GGWave messages from the default microphone. | |
| - Decodes each message into a "command" (ASCII/UTF-8 text). | |
| - Computes a response string and transmits it back over audio using GGWave. | |
| - Stops only when the command "STOP" is received (case-insensitive). | |
| Author: CDS | |
| License: Commercial | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import queue | |
| import sys | |
| import time | |
| from datetime import datetime | |
| from typing import Optional | |
| import numpy as np | |
| import sounddevice as sd | |
| # GGWave python bindings | |
| # Source: https://github.com/ggerganov/ggwave/tree/master/examples/ggwave-py | |
| import ggwave | |
| # ---------------------------- | |
| # Configuration defaults | |
| # ---------------------------- | |
| DEFAULT_SAMPLE_RATE = 48000 # GGWave works well at 44.1k or 48k; we use 48k for consistency | |
| DEFAULT_TX_VOLUME = 50 # GGWave volume [0..100] | |
| # Choose a protocol. "AUDIBLE_FAST" is a good default; change to ULTRASOUND if your hardware allows. | |
| DEFAULT_PROTOCOL = ggwave.ProtocolId.GGWAVE_TX_PROTOCOL_AUDIBLE_FAST | |
| # ---------------------------- | |
| # Utilities | |
| # ---------------------------- | |
| def safe_decode(b: bytes) -> str: | |
| """Decode bytes as UTF-8 with fallback to Latin-1 to avoid hard failures.""" | |
| try: | |
| return b.decode("utf-8") | |
| except UnicodeDecodeError: | |
| return b.decode("latin-1", errors="replace") | |
| def build_response(command: str) -> str: | |
| """ | |
| Map an incoming command to a response string. | |
| You can replace this logic with your own command handler, database lookup, | |
| or application logic. Keep responses short (GGWave sends short strings best). | |
| """ | |
| cmd = command.strip().upper() | |
| # Special case handled by main loop: STOP | |
| if cmd == "PING": | |
| return "PONG" | |
| if cmd == "TIME": | |
| return datetime.now().strftime("TIME %H:%M:%S") | |
| if cmd == "DATE": | |
| return datetime.now().strftime("DATE %Y-%m-%d") | |
| if cmd.startswith("ECHO "): | |
| return command[5:].strip() or "OK" | |
| if cmd in {"HELLO", "HI"}: | |
| return "HELLO" | |
| if cmd == "HELP": | |
| return "CMDS: PING, TIME, DATE, ECHO <text>, STOP" | |
| # Default: acknowledge what we heard | |
| return f"ACK {command.strip()[:48]}" # keep it short-ish | |
| # ---------------------------- | |
| # GGWave I/O helpers | |
| # ---------------------------- | |
| class GGWaveCodec: | |
| """ | |
| Thin wrapper around the ggwave instance to encode (tx) and decode (rx) samples. | |
| This class: | |
| - Initializes the GGWave context with default parameters. | |
| - Encodes text into PCM int16 for playback via sounddevice. | |
| - Accepts captured int16 audio and attempts to decode text messages. | |
| """ | |
| def __init__(self, sample_rate: int = DEFAULT_SAMPLE_RATE): | |
| self.sample_rate = sample_rate | |
| # Initialize GGWave instance with default parameters and our sample rate | |
| params = ggwave.getDefaultParameters() | |
| # Ensure GGWave runs at our chosen audio sample rate | |
| params.sampleRate = float(sample_rate) | |
| self._ctx = ggwave.init(params) | |
| # Create and cache a reusable RX workspace (optional but efficient) | |
| self._rx_state = ggwave.State() | |
| def encode_text(self, text: str, | |
| protocol: ggwave.ProtocolId = DEFAULT_PROTOCOL, | |
| volume: int = DEFAULT_TX_VOLUME) -> np.ndarray: | |
| """ | |
| Encode text into PCM int16 samples suitable for playback. | |
| Returns: | |
| numpy array of shape (N,) dtype=int16 | |
| """ | |
| # ggwave.encode() returns raw bytes for 16-bit little-endian PCM | |
| pcm_bytes = ggwave.encode(self._ctx, | |
| text.encode("utf-8"), | |
| protocol, | |
| volume, | |
| self.sample_rate) | |
| pcm = np.frombuffer(pcm_bytes, dtype=np.int16) | |
| return pcm | |
| def try_decode(self, pcm_chunk: np.ndarray) -> Optional[str]: | |
| """ | |
| Try to decode a GGWave message from a chunk of PCM int16 samples. | |
| Returns: | |
| The decoded string if a complete message is recovered, else None. | |
| """ | |
| if pcm_chunk.size == 0: | |
| return None | |
| # Prepare bytes for the decoder (16-bit LE PCM) | |
| raw = pcm_chunk.astype(np.int16, copy=False).tobytes() | |
| # decode() returns bytes (message) or None / empty if nothing was decoded | |
| msg = ggwave.decode(self._ctx, raw, self._rx_state) | |
| if not msg: | |
| return None | |
| return safe_decode(msg) | |
| def close(self): | |
| """Free GGWave resources.""" | |
| ggwave.free(self._ctx) | |
| # ---------------------------- | |
| # Audio I/O (sounddevice) | |
| # ---------------------------- | |
| class AudioLoop: | |
| """ | |
| Manages the microphone input stream and speaker output for GGWave. | |
| - Captures mono int16 frames from the default input device. | |
| - Feeds frames into the GGWave decoder and pushes any decoded messages onto a queue. | |
| - Provides a synchronous transmit (playback) helper for responses. | |
| """ | |
| def __init__(self, | |
| codec: GGWaveCodec, | |
| samplerate: int, | |
| blocksize: int = 1024, | |
| device_in: Optional[int | str] = None, | |
| device_out: Optional[int | str] = None): | |
| self.codec = codec | |
| self.samplerate = samplerate | |
| self.blocksize = blocksize | |
| self.device_in = device_in | |
| self.device_out = device_out | |
| self._messages = queue.Queue() # decoded text messages | |
| # Configure default devices (optional) | |
| if device_in is not None or device_out is not None: | |
| sd.default.device = (device_in, device_out) | |
| # We capture mono int16 to keep things simple and efficient | |
| self._in_stream = sd.InputStream( | |
| samplerate=self.samplerate, | |
| channels=1, | |
| dtype="int16", | |
| blocksize=self.blocksize, | |
| callback=self._on_audio_in, | |
| ) | |
| # ------------- input ------------- | |
| def _on_audio_in(self, indata, frames, time_info, status): | |
| """sounddevice callback: decode any GGWave message from incoming audio.""" | |
| if status: | |
| # Over/underflows can happen; log to stderr but keep going | |
| print(f"[audio warning] {status}", file=sys.stderr) | |
| # indata: shape (frames, channels=1), dtype=int16 | |
| pcm = np.squeeze(indata) # (frames,) | |
| text = self.codec.try_decode(pcm) | |
| if text: | |
| self._messages.put(text) | |
| def start(self): | |
| """Start capturing from the microphone.""" | |
| self._in_stream.start() | |
| def stop(self): | |
| """Stop capturing from the microphone.""" | |
| self._in_stream.stop() | |
| def get_message_nowait(self) -> Optional[str]: | |
| """Non-blocking read of a decoded message (if any).""" | |
| try: | |
| return self._messages.get_nowait() | |
| except queue.Empty: | |
| return None | |
| # ------------- output ------------- | |
| def transmit(self, text: str): | |
| """Encode and synchronously play a GGWave message on the default speaker.""" | |
| pcm = self.codec.encode_text(text) | |
| # Play mono int16 at the same sample rate we used for encoding | |
| sd.play(pcm, samplerate=self.samplerate, blocking=True, device=self.device_out) | |
| # ---------------------------- | |
| # Main loop | |
| # ---------------------------- | |
| def run_loop(args): | |
| """Main run loop until we receive the command STOP.""" | |
| codec = GGWaveCodec(sample_rate=args.samplerate) | |
| audio = AudioLoop(codec, samplerate=args.samplerate, | |
| blocksize=args.blocksize, | |
| device_in=args.input_device, | |
| device_out=args.output_device) | |
| print(f"[ggwave] listening at {args.samplerate} Hz; say HELP for commands; say STOP to exit.") | |
| audio.start() | |
| try: | |
| # Optionally announce readiness | |
| if args.tx_ready: | |
| time.sleep(0.2) # small delay to avoid talking over ourselves | |
| audio.transmit("READY") | |
| while True: | |
| msg = audio.get_message_nowait() | |
| if msg is None: | |
| time.sleep(0.01) # keep loop snappy but light | |
| continue | |
| cleaned = msg.strip() | |
| print(f"[rx] {cleaned}") | |
| if cleaned.upper() == "STOP": | |
| audio.transmit("BYE") | |
| break | |
| # Compute and send a response | |
| response = build_response(cleaned) | |
| print(f"[tx] {response}") | |
| audio.transmit(response) | |
| except KeyboardInterrupt: | |
| print("\n[ggwave] interrupted by user") | |
| finally: | |
| audio.stop() | |
| codec.close() | |
| sd.stop() # ensure any lingering playback is halted | |
| print("[ggwave] closed") | |
| # ---------------------------- | |
| # CLI | |
| # ---------------------------- | |
| def parse_args(argv=None): | |
| p = argparse.ArgumentParser( | |
| description="GGWave command listener/transmitter loop." | |
| ) | |
| p.add_argument("--samplerate", type=int, default=DEFAULT_SAMPLE_RATE, | |
| help=f"Audio sample rate in Hz (default: {DEFAULT_SAMPLE_RATE})") | |
| p.add_argument("--blocksize", type=int, default=1024, | |
| help="Microphone block size in frames (default: 1024)") | |
| p.add_argument("--input-device", type=str, default=None, | |
| help="sounddevice input device name or index (default: system default)") | |
| p.add_argument("--output-device", type=str, default=None, | |
| help="sounddevice output device name or index (default: system default)") | |
| p.add_argument("--tx-ready", action="store_true", | |
| help='Transmit a short "READY" tone/message on startup') | |
| return p.parse_args(argv) | |
| if __name__ == "__main__": | |
| args = parse_args() | |
| run_loop(args) |
Author
Author
sudo apt update
sudo apt install -y build-essential cmake portaudio19-dev
python3 -m venv .venv
source .venv/bin/activate
pip install -U pip setuptools wheel "cython>=3.0.10" numpy
git clone https://github.com/ggerganov/ggwave.git
cd ggwave/examples/ggwave-py
pip install .
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
deps for audio I/O
sudo apt-get update && sudo apt-get install -y python3.11-venv portaudio19-dev
venv
python3.11 -m venv .venv311
source .venv311/bin/activate
toolchain + libs
pip install -U pip setuptools wheel
pip install numpy sounddevice ggwave==0.4.2
python -c "import ggwave, sounddevice; print('ggwave OK')"