Created
April 15, 2026 09:04
-
-
Save ifuller1/e7c015fef8d3528100bd83a6d5bc78bc to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Voice Enrollment Tool | |
| Records voice samples and generates a speaker embedding (.npy file). | |
| Copy the resulting .npy to your Wyoming bridge's voice_profiles/ directory. | |
| """ | |
| import argparse | |
| import sys | |
| import wave | |
| from pathlib import Path | |
| import numpy as np | |
| import sounddevice as sd | |
| from resemblyzer import VoiceEncoder, preprocess_wav | |
| SAMPLE_RATE = 16000 | |
| DEFAULT_DURATION = 5 | |
| DEFAULT_SAMPLES = 3 | |
| def list_devices(): | |
| """Show available microphones.""" | |
| print("\nAvailable input devices:\n") | |
| devices = sd.query_devices() | |
| for i, d in enumerate(devices): | |
| if d["max_input_channels"] > 0: | |
| default = " <-- default" if i == sd.default.device[0] else "" | |
| print(f" [{i}] {d['name']} ({d['max_input_channels']}ch){default}") | |
| print() | |
| def record_sample(duration: float, device=None) -> np.ndarray: | |
| """Record from the mic and return float32 mono audio.""" | |
| print(f" Recording for {duration}s...") | |
| audio = sd.rec( | |
| int(duration * SAMPLE_RATE), | |
| samplerate=SAMPLE_RATE, | |
| channels=1, | |
| dtype="float32", | |
| device=device, | |
| ) | |
| sd.wait() | |
| audio = audio.flatten() | |
| # Show a simple level check | |
| peak = np.max(np.abs(audio)) | |
| if peak < 0.01: | |
| print(f" Warning: very quiet recording (peak: {peak:.4f}). Check your mic.") | |
| else: | |
| print(f" Captured. Peak level: {peak:.3f}") | |
| return audio | |
| def load_wav(path: Path) -> np.ndarray: | |
| """Load a WAV file as float32 mono.""" | |
| with wave.open(str(path), "rb") as wf: | |
| sr = wf.getframerate() | |
| sw = wf.getsampwidth() | |
| ch = wf.getnchannels() | |
| raw = wf.readframes(wf.getnframes()) | |
| if sw == 2: | |
| audio = np.frombuffer(raw, dtype=np.int16).astype(np.float32) / 32768.0 | |
| elif sw == 4: | |
| audio = np.frombuffer(raw, dtype=np.int32).astype(np.float32) / 2147483648.0 | |
| else: | |
| raise ValueError(f"Unsupported sample width: {sw}") | |
| if ch == 2: | |
| audio = audio[::2] | |
| return preprocess_wav(audio, source_sr=sr) | |
| def enroll(name: str, output_dir: Path, files=None, | |
| num_samples=DEFAULT_SAMPLES, duration=DEFAULT_DURATION, device=None): | |
| """Create a voice profile.""" | |
| output_dir.mkdir(parents=True, exist_ok=True) | |
| print("Loading voice encoder model (first run downloads ~17MB)...") | |
| encoder = VoiceEncoder() | |
| print("Model loaded.\n") | |
| embeddings = [] | |
| if files: | |
| for f in files: | |
| print(f" Processing: {f}") | |
| audio = load_wav(f) | |
| emb = encoder.embed_utterance(audio) | |
| embeddings.append(emb) | |
| print(f" Embedded OK") | |
| else: | |
| print(f"Enrolling: {name}") | |
| print(f"Recording {num_samples} samples of {duration}s each.") | |
| print("Speak naturally — a sentence or two per sample.\n") | |
| for i in range(num_samples): | |
| input(f"Press Enter to start sample {i + 1}/{num_samples}...") | |
| audio = record_sample(duration, device=device) | |
| processed = preprocess_wav(audio, source_sr=SAMPLE_RATE) | |
| emb = encoder.embed_utterance(processed) | |
| embeddings.append(emb) | |
| print() | |
| # Average and normalise | |
| profile = np.mean(embeddings, axis=0) | |
| profile = profile / np.linalg.norm(profile) | |
| out_path = output_dir / f"{name}.npy" | |
| np.save(out_path, profile) | |
| print(f"Profile saved: {out_path}") | |
| # Consistency check | |
| if len(embeddings) > 1: | |
| sims = [] | |
| for i in range(len(embeddings)): | |
| for j in range(i + 1, len(embeddings)): | |
| s = np.dot(embeddings[i], embeddings[j]) / ( | |
| np.linalg.norm(embeddings[i]) * np.linalg.norm(embeddings[j]) | |
| ) | |
| sims.append(s) | |
| avg = np.mean(sims) | |
| print(f"Inter-sample consistency: {avg:.3f} {'(good)' if avg > 0.85 else '(try re-recording)'}") | |
| print(f"\nCopy {out_path} to your NUC:") | |
| print(f" scp {out_path} nuc:~/wyoming-bridge/voice_profiles/") | |
| def test_profile(name: str, profiles_dir: Path, files=None, device=None): | |
| """Record or load audio and compare against an enrolled profile.""" | |
| profile_path = profiles_dir / f"{name}.npy" | |
| if not profile_path.exists(): | |
| print(f"No profile found for '{name}' at {profile_path}") | |
| sys.exit(1) | |
| profile = np.load(profile_path) | |
| encoder = VoiceEncoder() | |
| if files: | |
| for f in files: | |
| audio = load_wav(f) | |
| emb = encoder.embed_utterance(audio) | |
| score = np.dot(emb, profile) / (np.linalg.norm(emb) * np.linalg.norm(profile)) | |
| status = "MATCH" if score >= 0.75 else "NO MATCH" | |
| print(f" {f.name}: {score:.3f} [{status}]") | |
| else: | |
| input("Press Enter to record a test sample...") | |
| audio = record_sample(5, device=device) | |
| processed = preprocess_wav(audio, source_sr=SAMPLE_RATE) | |
| emb = encoder.embed_utterance(processed) | |
| score = np.dot(emb, profile) / (np.linalg.norm(emb) * np.linalg.norm(profile)) | |
| status = "MATCH" if score >= 0.75 else "NO MATCH" | |
| print(f"\n Score: {score:.3f} [{status}]") | |
| def list_profiles(profiles_dir: Path): | |
| """List enrolled profiles.""" | |
| profiles = sorted(profiles_dir.glob("*.npy")) | |
| if not profiles: | |
| print("No profiles found.") | |
| return | |
| print(f"\nProfiles in {profiles_dir}:\n") | |
| for p in profiles: | |
| print(f" {p.stem}") | |
| def main(): | |
| parser = argparse.ArgumentParser( | |
| description="Voice enrollment for speaker identification", | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| epilog=""" | |
| Examples: | |
| %(prog)s --name ian Record 3 samples interactively | |
| %(prog)s --name ian --files a.wav b.wav Enroll from existing WAVs | |
| %(prog)s --name ian --device 2 Use a specific mic | |
| %(prog)s --test --name ian Record and verify against profile | |
| %(prog)s --list-devices Show available microphones | |
| %(prog)s --list Show enrolled profiles | |
| """, | |
| ) | |
| parser.add_argument("--name", "-n", help="Speaker name") | |
| parser.add_argument("--files", "-f", nargs="+", type=Path, help="WAV files for enrollment") | |
| parser.add_argument("--output-dir", "-o", type=Path, | |
| default=Path(__file__).parent / "voice_profiles", | |
| help="Output directory for profiles") | |
| parser.add_argument("--samples", "-s", type=int, default=DEFAULT_SAMPLES, | |
| help=f"Number of samples to record (default: {DEFAULT_SAMPLES})") | |
| parser.add_argument("--duration", "-d", type=float, default=DEFAULT_DURATION, | |
| help=f"Seconds per sample (default: {DEFAULT_DURATION})") | |
| parser.add_argument("--device", type=int, help="Audio input device index") | |
| parser.add_argument("--test", "-t", action="store_true", help="Test against a profile") | |
| parser.add_argument("--list", "-l", action="store_true", help="List enrolled profiles") | |
| parser.add_argument("--list-devices", action="store_true", help="List audio input devices") | |
| parser.add_argument("--delete", action="store_true", help="Delete a profile") | |
| args = parser.parse_args() | |
| if args.list_devices: | |
| list_devices() | |
| elif args.list: | |
| list_profiles(args.output_dir) | |
| elif args.delete: | |
| if not args.name: | |
| parser.error("--delete requires --name") | |
| p = args.output_dir / f"{args.name}.npy" | |
| if p.exists(): | |
| p.unlink() | |
| print(f"Deleted: {args.name}") | |
| else: | |
| print(f"Not found: {args.name}") | |
| elif args.test: | |
| if not args.name: | |
| parser.error("--test requires --name") | |
| test_profile(args.name, args.output_dir, args.files, args.device) | |
| else: | |
| if not args.name: | |
| parser.error("--name is required for enrollment") | |
| enroll(args.name, args.output_dir, args.files, args.samples, args.duration, args.device) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment