Skip to content

Instantly share code, notes, and snippets.

@twobob
Last active April 16, 2026 10:50
Show Gist options
  • Select an option

  • Save twobob/e8069711d5357ccebc685ebca53c5435 to your computer and use it in GitHub Desktop.

Select an option

Save twobob/e8069711d5357ccebc685ebca53c5435 to your computer and use it in GitHub Desktop.
stt using parakeet and optionally canary
# setup_complete_pytorch_stack_128_270_313.ps1
$EnvName = "128_270_313"
$InstallDir = "$env:USERPROFILE\Miniconda3"
$CondaPath = "$InstallDir\Scripts\conda.exe"
Write-Host "1. Downloading Miniconda..."
Invoke-WebRequest -Uri "https://repo.anaconda.com/miniconda/Miniconda3-latest-Windows-x86_64.exe" -OutFile "miniconda_installer.exe"
Write-Host "2. Installing Miniconda..."
Start-Process -FilePath ".\miniconda_installer.exe" -ArgumentList "/InstallationType=JustMe /RegisterPython=0 /S /D=$InstallDir" -Wait
Write-Host "3. Initializing shell profiles..."
& $CondaPath init powershell
& $CondaPath init cmd.exe
Remove-Item ".\miniconda_installer.exe"
Write-Host "4. Accepting Anaconda Terms of Service..."
& $CondaPath tos accept --override-channels --channel https://repo.anaconda.com/pkgs/main
& $CondaPath tos accept --override-channels --channel https://repo.anaconda.com/pkgs/r
& $CondaPath tos accept --override-channels --channel https://repo.anaconda.com/pkgs/msys2
Write-Host "5. Provisioning Python 3.13 environment ('$EnvName')..."
& $CondaPath create --name $EnvName python=3.13 -y
Write-Host "6. Installing PyTorch 2.7.0 and NVIDIA CUDA 12.8 toolkit via PIP wheel index... this make take a while, be patient"
& $CondaPath run -n $EnvName pip install torch==2.7.0 torchvision==0.22.0 torchaudio==2.7.0 --index-url https://download.pytorch.org/whl/cu128
Write-Host "7. Installing editdistance..."
& $CondaPath install --name $EnvName editdistance -y
Write-Host "8. installing extras, one sec"
& $CondaPath run -n $EnvName pip install nemo_toolkit["asr"] keyboard sounddevice soundfile pyperclip
Write-Host "Operation Complete. Restart your terminal and run 'conda activate $EnvName' to begin development."
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import json
import os
import queue
import sys
import tempfile
import threading
import time
import re
from functools import reduce
from pathlib import Path
from typing import Any
DEFAULT_PARAKEET_MODEL = os.environ.get("PARAKEET_MODEL", "nvidia/parakeet-tdt-0.6b-v3")
DEFAULT_CANARY_MODEL = os.environ.get("CANARY_MODEL", "nvidia/canary-qwen-2.5b")
DEFAULT_DEVICE = os.environ.get("TRANSCRIBE_DEVICE", "auto").strip().lower()
DEFAULT_DTYPE = os.environ.get("TRANSCRIBE_DTYPE", "auto").strip().lower()
DEFAULT_SAMPLE_RATE = int(os.environ.get("MIC_SAMPLE_RATE", "16000"))
DEFAULT_START_THRESHOLD = float(os.environ.get("MIC_START_THRESHOLD", "0.015"))
DEFAULT_STOP_THRESHOLD = float(os.environ.get("MIC_STOP_THRESHOLD", "0.010"))
DEFAULT_MIN_SPEECH_SECONDS = float(os.environ.get("MIC_MIN_SPEECH_SECONDS", "0.18"))
DEFAULT_SILENCE_SECONDS = float(os.environ.get("MIC_SILENCE_SECONDS", "0.85"))
DEFAULT_PREROLL_SECONDS = float(os.environ.get("MIC_PREROLL_SECONDS", "0.35"))
DEFAULT_MAX_RECORD_SECONDS = float(os.environ.get("MIC_MAX_RECORD_SECONDS", "60.0"))
DEFAULT_BLOCK_MS = int(os.environ.get("MIC_BLOCK_MS", "30"))
DEFAULT_CONFIG_PATH = Path(
os.environ.get(
"ANTIGRAVITY_STT_CONFIG",
str(Path.home() / ".config" / "antigravity" / "stt_config.json") if os.name != "nt"
else str(Path.home() / "AppData" / "Local" / "Antigravity" / "stt_config.json"),
)
).expanduser()
class StatusReporter:
def __init__(self, enabled: bool = True) -> None:
self.enabled = enabled
self._lock = threading.RLock()
self._last_len = 0
def set_enabled(self, enabled: bool) -> None:
with self._lock:
self.enabled = enabled
def show(self, message: str) -> None:
with self._lock:
if not self.enabled:
return
line = f"[STT] {message}"
padded = line
if self._last_len > len(line):
padded = line + (" " * (self._last_len - len(line)))
print(f"\r{padded}", file=sys.stderr, end="", flush=True)
self._last_len = len(line)
def clear(self) -> None:
with self._lock:
if not self.enabled:
return
if self._last_len > 0:
print("\r" + (" " * (self._last_len + 6)) + "\r", file=sys.stderr, end="", flush=True)
self._last_len = 0
def done(self, message: str) -> None:
with self._lock:
if not self.enabled:
return
self.show(message)
print(file=sys.stderr, flush=True)
self._last_len = 0
STATUS = StatusReporter(enabled=True)
class ConfigManager:
def __init__(self, path: Path) -> None:
self.path = path
self._lock = threading.RLock()
def load(self) -> dict[str, Any]:
with self._lock:
if not self.path.exists():
return {}
try:
return json.loads(self.path.read_text(encoding="utf-8"))
except Exception:
return {}
def save(self, data: dict[str, Any]) -> None:
with self._lock:
self.path.parent.mkdir(parents=True, exist_ok=True)
tmp = self.path.with_suffix(self.path.suffix + ".tmp")
tmp.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
tmp.replace(self.path)
def get_preferred_microphone(self) -> dict[str, Any] | None:
item = self.load().get("preferred_microphone")
return item if isinstance(item, dict) else None
def set_preferred_microphone(self, microphone: dict[str, Any] | None) -> None:
data = self.load()
if microphone is None:
data.pop("preferred_microphone", None)
else:
data["preferred_microphone"] = microphone
self.save(data)
CONFIG = ConfigManager(DEFAULT_CONFIG_PATH)
class ModelManager:
def __init__(self) -> None:
self._lock = threading.RLock()
self._parakeet = None
self._parakeet_name: str | None = None
self._canary = None
self._canary_name: str | None = None
def _torch(self):
import torch
return torch
def resolve_device(self) -> str:
torch = self._torch()
if DEFAULT_DEVICE == "auto":
return "cuda" if torch.cuda.is_available() else "cpu"
return DEFAULT_DEVICE
def resolve_dtype(self):
torch = self._torch()
if DEFAULT_DTYPE == "auto":
return torch.float16 if self.resolve_device() == "cuda" else torch.float32
return {
"float16": torch.float16,
"float32": torch.float32,
"bfloat16": torch.bfloat16,
}[DEFAULT_DTYPE]
def get_parakeet(self, model_name: str):
with self._lock:
if self._parakeet is not None and self._parakeet_name == model_name:
return self._parakeet
STATUS.show(f"DOWNLOADING / LOADING PARAKEET: {model_name}")
import nemo.collections.asr as nemo_asr
model = nemo_asr.models.ASRModel.from_pretrained(model_name=model_name)
if self.resolve_device() == "cuda":
model = model.cuda()
model.eval()
self._parakeet = model
self._parakeet_name = model_name
STATUS.show(f"PARAKEET READY: {model_name}")
return model
def get_canary(self, model_name: str):
with self._lock:
if self._canary is not None and self._canary_name == model_name:
return self._canary
STATUS.show(f"DOWNLOADING / LOADING CANARY: {model_name}")
from nemo.collections.speechlm2.models import SALM
model = SALM.from_pretrained(model_name)
try:
if self.resolve_device() == "cuda":
model = model.cuda()
if hasattr(model, "to"):
model = model.to(dtype=self.resolve_dtype())
model.eval()
except Exception:
pass
self._canary = model
self._canary_name = model_name
STATUS.show(f"CANARY READY: {model_name}")
return model
MODELS = ModelManager()
def query_microphones() -> dict[str, Any]:
import sounddevice as sd
devices = sd.query_devices()
try:
default_input_index = sd.default.device[0]
except Exception:
default_input_index = None
result = []
for idx, raw in enumerate(devices):
rec = {
"index": int(idx),
"name": str(raw.get("name", f"Input {idx}")),
"max_input_channels": int(raw.get("max_input_channels", 0) or 0),
"default_samplerate": float(raw.get("default_samplerate", 0.0) or 0.0),
"is_default_input": default_input_index is not None and int(idx) == int(default_input_index),
}
if rec["max_input_channels"] > 0:
result.append(rec)
return {"devices": result, "default_input_index": default_input_index}
def resolve_microphone(device: int | str | None) -> tuple[int | None, dict[str, Any] | None]:
info = query_microphones()
if device is None:
preferred = CONFIG.get_preferred_microphone()
if preferred is not None and isinstance(preferred.get("index"), int):
for mic in info["devices"]:
if int(mic["index"]) == int(preferred["index"]):
return int(mic["index"]), mic
default_index = info["default_input_index"]
if default_index is None:
return None, None
for mic in info["devices"]:
if int(mic["index"]) == int(default_index):
return int(mic["index"]), mic
return int(default_index), None
if isinstance(device, int) or (isinstance(device, str) and device.isdigit()):
idx = int(device)
for mic in info["devices"]:
if int(mic["index"]) == idx:
return idx, mic
raise ValueError(f"Microphone index not found: {idx}")
target = str(device).strip().lower()
exact = None
partial = None
for mic in info["devices"]:
name = str(mic["name"]).lower()
if name == target:
exact = mic
break
if target in name and partial is None:
partial = mic
match = exact or partial
if match is None:
raise ValueError(f"Microphone name not found: {device}")
return int(match["index"]), match
def pick_record_samplerate(requested: int, mic: dict[str, Any] | None) -> int:
if requested > 0:
return requested
if mic is not None:
rate = int(float(mic.get("default_samplerate") or 0.0))
if rate > 0:
return rate
return DEFAULT_SAMPLE_RATE
def record_phrase_to_wav(
output_path: Path,
device: int | None,
sample_rate: int,
start_threshold: float,
stop_threshold: float,
min_speech_seconds: float,
silence_seconds: float,
preroll_seconds: float,
max_record_seconds: float,
block_ms: int,
) -> dict[str, Any]:
import collections
import numpy as np
import sounddevice as sd
import soundfile as sf
block_frames = max(1, int(sample_rate * (block_ms / 1000.0)))
preroll_blocks = max(1, int(round(preroll_seconds * sample_rate / block_frames)))
min_speech_blocks = max(1, int(round(min_speech_seconds * sample_rate / block_frames)))
silence_blocks_to_stop = max(1, int(round(silence_seconds * sample_rate / block_frames)))
max_blocks = max(1, int(round(max_record_seconds * sample_rate / block_frames)))
q: queue.Queue[Any] = queue.Queue()
preroll = collections.deque(maxlen=preroll_blocks)
utterance: list[np.ndarray] = []
speech_started = False
speech_count = 0
silence_count = 0
total_blocks = 0
peak_rms = 0.0
def callback(indata, frames, time_info, status):
q.put(indata.copy())
STATUS.show("LISTENING")
t0 = time.perf_counter()
with sd.InputStream(
samplerate=sample_rate,
channels=1,
dtype="float32",
blocksize=block_frames,
callback=callback,
device=device,
):
while True:
block = q.get()
total_blocks += 1
mono = block[:, 0] if block.ndim > 1 else block
rms = float((mono.astype("float64") ** 2).mean() ** 0.5)
peak_rms = max(peak_rms, rms)
if not speech_started:
preroll.append(block)
if rms >= start_threshold:
speech_count += 1
else:
speech_count = 0
if speech_count >= min_speech_blocks:
speech_started = True
STATUS.show("SPEECH DETECTED")
utterance.extend(list(preroll))
utterance.append(block)
silence_count = 0
else:
utterance.append(block)
if rms <= stop_threshold:
silence_count += 1
else:
silence_count = 0
if silence_count >= silence_blocks_to_stop:
break
if total_blocks >= max_blocks:
break
t1 = time.perf_counter()
if not utterance:
raise RuntimeError("No speech detected.")
audio = np.concatenate(utterance, axis=0).astype("float32", copy=False)
output_path.parent.mkdir(parents=True, exist_ok=True)
sf.write(str(output_path), audio, sample_rate, subtype="PCM_16")
t2 = time.perf_counter()
return {
"capture_ms": round((t1 - t0) * 1000.0, 3),
"write_wav_ms": round((t2 - t1) * 1000.0, 3),
"total_capture_ms": round((t2 - t0) * 1000.0, 3),
"peak_rms": round(peak_rms, 6),
}
def transcribe_file(audio_path: Path, mode: str) -> str:
STATUS.show(f"TRANSCRIBING ({mode})")
if mode == "fast":
model = MODELS.get_parakeet(DEFAULT_PARAKEET_MODEL)
try:
result = model.transcribe(
[str(audio_path)],
batch_size=1,
verbose=False,
return_hypotheses=True,
)
except TypeError:
result = model.transcribe(
[str(audio_path)],
batch_size=1,
verbose=False,
)
if isinstance(result, tuple):
result = result[0]
if not isinstance(result, list):
result = [result]
hyp = result[0] if result else None
if hasattr(hyp, "text"):
return str(hyp.text or "").strip()
return str(hyp or "").strip()
model = MODELS.get_canary(DEFAULT_CANARY_MODEL)
audio_locator = getattr(model, "audio_locator_tag", "<|audioplaceholder|>")
audio_str = str(audio_path)
# Exact schema mandated by NeMo SALM documentation
prompts = [
[
{
"role": "user",
"content": f"Transcribe the following: {audio_locator}",
"audio": [audio_str]
}
]
]
try:
answer_ids = model.generate(prompts=prompts, max_new_tokens=1024)
except Exception as exc:
raise RuntimeError(f"Canary generation failed: {exc}")
try:
if hasattr(answer_ids, "cpu"):
tokens = answer_ids.cpu().tolist()
else:
tokens = answer_ids
# Flatten nested lists sequentially
while isinstance(tokens, list) and len(tokens) > 0 and isinstance(tokens[0], list):
tokens = tokens[0]
if hasattr(model.tokenizer, "decode"):
text_out = model.tokenizer.decode(tokens, skip_special_tokens=True)
elif hasattr(model.tokenizer, "ids_to_text"):
text_out = model.tokenizer.ids_to_text(tokens)
else:
text_out = str(tokens)
except Exception as exc:
raise RuntimeError(f"Failed to decode Canary output: {exc}")
# Strip Qwen ChatML artifacts
if "<|im_start|>assistant" in text_out:
text_out = text_out.split("<|im_start|>assistant")[-1]
text_out = text_out.replace("<|im_end|>", "").replace("<|im_start|>", "").strip()
return text_out
def convert_us_to_uk_orthography_oneliner(text: str) -> str:
"""
Final optimized US->UK orthographic converter.
High-coverage heuristic using functional reduction.
"""
return reduce(lambda t, rule: re.sub(rule[0], lambda m: (lambda orig, exp: exp.upper() if orig.isupper() else (exp[0].upper() + exp[1:] if orig[0].isupper() else exp.lower()))(m.group(0), m.expand(rule[1])), t, flags=re.IGNORECASE), [
(r"\b(\w+)yz(e|es|ed|ing)\b", r"\1ys\2"),
(r"\b(?!(?:size|prize|capsize|seize|maize|assize|glaze|gaze|raze|doze|blaze)\b)(\w+)iz(e|es|ed|ing)\b", r"\1is\2"),
(r"\b(?!(?:actor|author|doctor|error|motor|sponsor|mirror|major|minor|sensor|factor|prior|mayor|senator|governor|chancellor|successor|vendor|visitor|terror|honorary)\b)(\w{2,})or(s|)\b", r"\1our\2"),
(r"\b(\w*[aeiou])l(ed|ing|er|ers)\b", r"\1ll\2"),
(r"\b(cent|met|theat|lit|fib|sombr|meagr|calibr|lust|spect|sepulch)er(s|)\b", r"\1re\2"),
(r"\b(\w+)(log|gog)(s|)\b", r"\1\2ue\3"),
(r"\b(def|off|pret)ense(s|)\b", r"\1ence\2"),
(r"\b(an|p|orthop|gyn|leuk|an|arch|encyclop|h)e(m|diatr|d|col|sthes|ol|matol)", r"\1ae\2"),
(r"\b(estrogen|esophagus|edema)\b", r"o\1"),
(r"\bmaneuver(s|ed|ing|)\b", r"manoeuvre\1"),
(r"\baluminum\b", "aluminium"),
(r"\bcheck(s|)\b", r"cheque\1"),
(r"\bjewelry\b", "jewellery"),
(r"\bprogram(s|)\b", r"programme\1"),
(r"\bmold(s|)\b", r"mould\1"),
(r"\bgray\b", "grey")
], text)
def emit_text_at_cursor(text: str, paste: bool = True) -> None:
if not text:
return
STATUS.show("PASTING")
if paste:
import pyperclip
import keyboard
pyperclip.copy(text)
time.sleep(0.05)
keyboard.press_and_release("ctrl+v")
return
import keyboard
keyboard.write(text, delay=0)
def capture_and_transcribe(microphone: int | str | None, mode: str, sample_rate: int, paste: bool, no_uk_spelling: bool = False) -> str:
idx, mic = resolve_microphone(microphone)
rate = pick_record_samplerate(sample_rate, mic)
mic_name = str(mic["name"]) if mic is not None and "name" in mic else f"device {idx}"
STATUS.show(f"INITIALIZING MIC: {mic_name} @ {rate} Hz")
tmp = tempfile.NamedTemporaryFile(prefix="antigravity_phrase_", suffix=".wav", delete=False)
tmp.close()
wav_path = Path(tmp.name)
try:
stats = record_phrase_to_wav(
wav_path,
device=idx,
sample_rate=rate,
start_threshold=DEFAULT_START_THRESHOLD,
stop_threshold=DEFAULT_STOP_THRESHOLD,
min_speech_seconds=DEFAULT_MIN_SPEECH_SECONDS,
silence_seconds=DEFAULT_SILENCE_SECONDS,
preroll_seconds=DEFAULT_PREROLL_SECONDS,
max_record_seconds=DEFAULT_MAX_RECORD_SECONDS,
block_ms=DEFAULT_BLOCK_MS,
)
text = transcribe_file(wav_path, mode=mode)
if not no_uk_spelling:
text = convert_us_to_uk_orthography_oneliner(text)
emit_text_at_cursor(text, paste=paste)
STATUS.done("DONE")
print(json.dumps({
"text": text,
"microphone": mic,
"sample_rate": rate,
"stats": stats,
"mode": mode,
}, ensure_ascii=False))
return text
finally:
try:
wav_path.unlink(missing_ok=True)
except Exception:
pass
def cmd_list_mics() -> int:
print(json.dumps(query_microphones(), ensure_ascii=False, indent=2))
return 0
def cmd_set_mic(device: str) -> int:
idx, mic = resolve_microphone(device)
if mic is None:
raise RuntimeError("Microphone not found.")
CONFIG.set_preferred_microphone(mic)
print(json.dumps({"preferred_microphone": mic, "config_path": str(CONFIG.path)}, ensure_ascii=False, indent=2))
return 0
def cmd_clear_mic() -> int:
CONFIG.set_preferred_microphone(None)
print(json.dumps({"preferred_microphone": None, "config_path": str(CONFIG.path)}, ensure_ascii=False, indent=2))
return 0
def cmd_once(args: argparse.Namespace) -> int:
STATUS.set_enabled(not args.quiet)
capture_and_transcribe(args.microphone, args.mode, args.sample_rate, paste=not args.type_keys, no_uk_spelling=args.no_uk_spelling)
return 0
def cmd_hotkey(args: argparse.Namespace) -> int:
import keyboard
STATUS.set_enabled(not args.quiet)
print(f"Ready. Press {args.hotkey} to capture one utterance. Press {args.quit_hotkey} to exit.")
while True:
keyboard.wait(args.hotkey)
capture_and_transcribe(args.microphone, args.mode, args.sample_rate, paste=not args.type_keys, no_uk_spelling=args.no_uk_spelling)
time.sleep(0.2)
if keyboard.is_pressed(args.quit_hotkey):
break
return 0
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Local microphone STT that types/pastes transcript at the active cursor.")
sub = parser.add_subparsers(dest="cmd", required=True)
sub.add_parser("list-mics")
p_set = sub.add_parser("set-mic")
p_set.add_argument("device")
sub.add_parser("clear-mic")
for name in ["once", "hotkey"]:
p = sub.add_parser(name)
p.add_argument("--microphone", default=None)
p.add_argument("--mode", choices=["fast", "high_quality"], default="fast")
p.add_argument("--sample-rate", type=int, default=0)
p.add_argument("--type-keys", action="store_true", help="Type characters instead of clipboard-paste.")
p.add_argument("--no-uk-spelling", action="store_true", help="Bypass US to UK orthography conversion.")
p.add_argument("--quiet", action="store_true", help="Disable runtime status messages.")
if name == "hotkey":
p.add_argument("--hotkey", default="ctrl+alt+space")
p.add_argument("--quit-hotkey", default="ctrl+alt+q")
return parser
def main() -> int:
parser = build_parser()
args = parser.parse_args()
if args.cmd == "list-mics":
return cmd_list_mics()
if args.cmd == "set-mic":
return cmd_set_mic(args.device)
if args.cmd == "clear-mic":
return cmd_clear_mic()
if args.cmd == "once":
return cmd_once(args)
if args.cmd == "hotkey":
return cmd_hotkey(args)
raise RuntimeError(f"Unsupported command: {args.cmd}")
if __name__ == "__main__":
try:
sys.exit(main())
except KeyboardInterrupt:
sys.exit(0)
@twobob
Copy link
Copy Markdown
Author

twobob commented Mar 21, 2026

canary
python .\stt.py hotkey --mode high_quality

regular
python .\stt.py hotkey

@twobob
Copy link
Copy Markdown
Author

twobob commented Apr 12, 2026

powershell setup_complete_pytorch_stack_128_270_313.ps1

conda activate 128_270_313

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment