Skip to content

Instantly share code, notes, and snippets.

@pkdavies
Last active October 29, 2025 13:30
Show Gist options
  • Select an option

  • Save pkdavies/d552b02032c16200dad6dac175f7b86b to your computer and use it in GitHub Desktop.

Select an option

Save pkdavies/d552b02032c16200dad6dac175f7b86b to your computer and use it in GitHub Desktop.
Full-duplex GGWave command loop
#!/usr/bin/env python3
"""
Simple full-duplex GGWave command loop.
- Listens continuously for incoming GGWave messages from the default microphone.
- Decodes each message into a "command" (ASCII/UTF-8 text).
- Computes a response string and transmits it back over audio using GGWave.
- Stops only when the command "STOP" is received (case-insensitive).
Author: CDS
License: Commercial
"""
from __future__ import annotations
import argparse
import queue
import sys
import time
from datetime import datetime
from typing import Optional
import numpy as np
import sounddevice as sd
# GGWave python bindings
# Source: https://github.com/ggerganov/ggwave/tree/master/examples/ggwave-py
import ggwave
# ----------------------------
# Configuration defaults
# ----------------------------
DEFAULT_SAMPLE_RATE = 48000 # GGWave works well at 44.1k or 48k; we use 48k for consistency
DEFAULT_TX_VOLUME = 50 # GGWave volume [0..100]
# Choose a protocol. "AUDIBLE_FAST" is a good default; change to ULTRASOUND if your hardware allows.
DEFAULT_PROTOCOL = ggwave.ProtocolId.GGWAVE_TX_PROTOCOL_AUDIBLE_FAST
# ----------------------------
# Utilities
# ----------------------------
def safe_decode(b: bytes) -> str:
"""Decode bytes as UTF-8 with fallback to Latin-1 to avoid hard failures."""
try:
return b.decode("utf-8")
except UnicodeDecodeError:
return b.decode("latin-1", errors="replace")
def build_response(command: str) -> str:
"""
Map an incoming command to a response string.
You can replace this logic with your own command handler, database lookup,
or application logic. Keep responses short (GGWave sends short strings best).
"""
cmd = command.strip().upper()
# Special case handled by main loop: STOP
if cmd == "PING":
return "PONG"
if cmd == "TIME":
return datetime.now().strftime("TIME %H:%M:%S")
if cmd == "DATE":
return datetime.now().strftime("DATE %Y-%m-%d")
if cmd.startswith("ECHO "):
return command[5:].strip() or "OK"
if cmd in {"HELLO", "HI"}:
return "HELLO"
if cmd == "HELP":
return "CMDS: PING, TIME, DATE, ECHO <text>, STOP"
# Default: acknowledge what we heard
return f"ACK {command.strip()[:48]}" # keep it short-ish
# ----------------------------
# GGWave I/O helpers
# ----------------------------
class GGWaveCodec:
"""
Thin wrapper around the ggwave instance to encode (tx) and decode (rx) samples.
This class:
- Initializes the GGWave context with default parameters.
- Encodes text into PCM int16 for playback via sounddevice.
- Accepts captured int16 audio and attempts to decode text messages.
"""
def __init__(self, sample_rate: int = DEFAULT_SAMPLE_RATE):
self.sample_rate = sample_rate
# Initialize GGWave instance with default parameters and our sample rate
params = ggwave.getDefaultParameters()
# Ensure GGWave runs at our chosen audio sample rate
params.sampleRate = float(sample_rate)
self._ctx = ggwave.init(params)
# Create and cache a reusable RX workspace (optional but efficient)
self._rx_state = ggwave.State()
def encode_text(self, text: str,
protocol: ggwave.ProtocolId = DEFAULT_PROTOCOL,
volume: int = DEFAULT_TX_VOLUME) -> np.ndarray:
"""
Encode text into PCM int16 samples suitable for playback.
Returns:
numpy array of shape (N,) dtype=int16
"""
# ggwave.encode() returns raw bytes for 16-bit little-endian PCM
pcm_bytes = ggwave.encode(self._ctx,
text.encode("utf-8"),
protocol,
volume,
self.sample_rate)
pcm = np.frombuffer(pcm_bytes, dtype=np.int16)
return pcm
def try_decode(self, pcm_chunk: np.ndarray) -> Optional[str]:
"""
Try to decode a GGWave message from a chunk of PCM int16 samples.
Returns:
The decoded string if a complete message is recovered, else None.
"""
if pcm_chunk.size == 0:
return None
# Prepare bytes for the decoder (16-bit LE PCM)
raw = pcm_chunk.astype(np.int16, copy=False).tobytes()
# decode() returns bytes (message) or None / empty if nothing was decoded
msg = ggwave.decode(self._ctx, raw, self._rx_state)
if not msg:
return None
return safe_decode(msg)
def close(self):
"""Free GGWave resources."""
ggwave.free(self._ctx)
# ----------------------------
# Audio I/O (sounddevice)
# ----------------------------
class AudioLoop:
"""
Manages the microphone input stream and speaker output for GGWave.
- Captures mono int16 frames from the default input device.
- Feeds frames into the GGWave decoder and pushes any decoded messages onto a queue.
- Provides a synchronous transmit (playback) helper for responses.
"""
def __init__(self,
codec: GGWaveCodec,
samplerate: int,
blocksize: int = 1024,
device_in: Optional[int | str] = None,
device_out: Optional[int | str] = None):
self.codec = codec
self.samplerate = samplerate
self.blocksize = blocksize
self.device_in = device_in
self.device_out = device_out
self._messages = queue.Queue() # decoded text messages
# Configure default devices (optional)
if device_in is not None or device_out is not None:
sd.default.device = (device_in, device_out)
# We capture mono int16 to keep things simple and efficient
self._in_stream = sd.InputStream(
samplerate=self.samplerate,
channels=1,
dtype="int16",
blocksize=self.blocksize,
callback=self._on_audio_in,
)
# ------------- input -------------
def _on_audio_in(self, indata, frames, time_info, status):
"""sounddevice callback: decode any GGWave message from incoming audio."""
if status:
# Over/underflows can happen; log to stderr but keep going
print(f"[audio warning] {status}", file=sys.stderr)
# indata: shape (frames, channels=1), dtype=int16
pcm = np.squeeze(indata) # (frames,)
text = self.codec.try_decode(pcm)
if text:
self._messages.put(text)
def start(self):
"""Start capturing from the microphone."""
self._in_stream.start()
def stop(self):
"""Stop capturing from the microphone."""
self._in_stream.stop()
def get_message_nowait(self) -> Optional[str]:
"""Non-blocking read of a decoded message (if any)."""
try:
return self._messages.get_nowait()
except queue.Empty:
return None
# ------------- output -------------
def transmit(self, text: str):
"""Encode and synchronously play a GGWave message on the default speaker."""
pcm = self.codec.encode_text(text)
# Play mono int16 at the same sample rate we used for encoding
sd.play(pcm, samplerate=self.samplerate, blocking=True, device=self.device_out)
# ----------------------------
# Main loop
# ----------------------------
def run_loop(args):
"""Main run loop until we receive the command STOP."""
codec = GGWaveCodec(sample_rate=args.samplerate)
audio = AudioLoop(codec, samplerate=args.samplerate,
blocksize=args.blocksize,
device_in=args.input_device,
device_out=args.output_device)
print(f"[ggwave] listening at {args.samplerate} Hz; say HELP for commands; say STOP to exit.")
audio.start()
try:
# Optionally announce readiness
if args.tx_ready:
time.sleep(0.2) # small delay to avoid talking over ourselves
audio.transmit("READY")
while True:
msg = audio.get_message_nowait()
if msg is None:
time.sleep(0.01) # keep loop snappy but light
continue
cleaned = msg.strip()
print(f"[rx] {cleaned}")
if cleaned.upper() == "STOP":
audio.transmit("BYE")
break
# Compute and send a response
response = build_response(cleaned)
print(f"[tx] {response}")
audio.transmit(response)
except KeyboardInterrupt:
print("\n[ggwave] interrupted by user")
finally:
audio.stop()
codec.close()
sd.stop() # ensure any lingering playback is halted
print("[ggwave] closed")
# ----------------------------
# CLI
# ----------------------------
def parse_args(argv=None):
p = argparse.ArgumentParser(
description="GGWave command listener/transmitter loop."
)
p.add_argument("--samplerate", type=int, default=DEFAULT_SAMPLE_RATE,
help=f"Audio sample rate in Hz (default: {DEFAULT_SAMPLE_RATE})")
p.add_argument("--blocksize", type=int, default=1024,
help="Microphone block size in frames (default: 1024)")
p.add_argument("--input-device", type=str, default=None,
help="sounddevice input device name or index (default: system default)")
p.add_argument("--output-device", type=str, default=None,
help="sounddevice output device name or index (default: system default)")
p.add_argument("--tx-ready", action="store_true",
help='Transmit a short "READY" tone/message on startup')
return p.parse_args(argv)
if __name__ == "__main__":
args = parse_args()
run_loop(args)
@pkdavies
Copy link
Author

deps for audio I/O

sudo apt-get update && sudo apt-get install -y python3.11-venv portaudio19-dev

venv

python3.11 -m venv .venv311
source .venv311/bin/activate

toolchain + libs

pip install -U pip setuptools wheel
pip install numpy sounddevice ggwave==0.4.2

python -c "import ggwave, sounddevice; print('ggwave OK')"

@pkdavies
Copy link
Author

sudo apt update
sudo apt install -y build-essential cmake portaudio19-dev
python3 -m venv .venv
source .venv/bin/activate
pip install -U pip setuptools wheel "cython>=3.0.10" numpy

git clone https://github.com/ggerganov/ggwave.git
cd ggwave/examples/ggwave-py
pip install .

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment