Skip to content

Instantly share code, notes, and snippets.

@GOROman
Created April 29, 2026 02:42
Show Gist options
  • Select an option

  • Save GOROman/868e2007ca621fa6113fb71f6a9ba5de to your computer and use it in GitHub Desktop.

Select an option

Save GOROman/868e2007ca621fa6113fb71f6a9ba5de to your computer and use it in GitHub Desktop.
Groq - Speech to Text (STT) API のテスト
import os
import select
import sys
import termios
import tempfile
import threading
import time
import tty
import wave
from dataclasses import dataclass
from pathlib import Path
import numpy as np
import sounddevice as sd
from dotenv import load_dotenv
from groq import Groq
DEFAULT_MODEL = "whisper-large-v3-turbo"
LANGUAGE = "ja"
MIN_RECORD_SECONDS = 0.15
SPACE_RELEASE_TIMEOUT_SECONDS = 0.9
TARGET_SAMPLE_RATE = 16_000
@dataclass
class Recording:
path: Path
duration_seconds: float
class PushToTalkRecorder:
def __init__(self) -> None:
device_info = sd.query_devices(kind="input")
self.device_name = str(device_info["name"])
self.default_sample_rate = int(device_info["default_samplerate"])
self.input_sample_rate = TARGET_SAMPLE_RATE
self.channels = 1
self.frames: list[np.ndarray] = []
self.stream: sd.InputStream | None = None
self.lock = threading.Lock()
self.is_recording = False
self.is_transcribing = False
self.indicator_stop = threading.Event()
self.indicator_thread: threading.Thread | None = None
def start(self) -> None:
with self.lock:
if self.is_recording or self.is_transcribing:
return
self.frames = []
self.stream = self._open_input_stream()
self.stream.start()
self.is_recording = True
self.indicator_stop.clear()
self.indicator_thread = threading.Thread(
target=self._show_recording_indicator,
daemon=True,
)
self.indicator_thread.start()
if self.input_sample_rate == TARGET_SAMPLE_RATE:
print(f"録音設定: {self.input_sample_rate} Hz mono", flush=True)
else:
print(
f"録音設定: {self.input_sample_rate} Hz mono -> {TARGET_SAMPLE_RATE} Hz mono",
flush=True,
)
def stop(self) -> Recording | None:
with self.lock:
if not self.is_recording:
return None
stream = self.stream
indicator_thread = self.indicator_thread
self.stream = None
self.indicator_thread = None
self.is_recording = False
self.indicator_stop.set()
if stream is not None:
stream.stop()
stream.close()
if indicator_thread is not None:
indicator_thread.join()
print("", flush=True)
audio = self._joined_audio()
duration_seconds = len(audio) / self.input_sample_rate
if duration_seconds < MIN_RECORD_SECONDS:
print("録音が短すぎるため送信しません", flush=True)
return None
audio = self._resample_to_target(audio)
path = self._write_wav(audio)
return Recording(path=path, duration_seconds=duration_seconds)
def set_transcribing(self, value: bool) -> None:
with self.lock:
self.is_transcribing = value
def _on_audio(self, indata: np.ndarray, _frames: int, _time, status) -> None:
if status:
print(f"録音警告: {status}", flush=True)
with self.lock:
if self.is_recording:
self.frames.append(indata.copy())
def _joined_audio(self) -> np.ndarray:
with self.lock:
frames = list(self.frames)
self.frames = []
if not frames:
return np.empty((0, self.channels), dtype=np.float32)
return np.concatenate(frames, axis=0)
def _write_wav(self, audio: np.ndarray) -> Path:
pcm = np.clip(audio, -1.0, 1.0)
pcm = (pcm * 32767).astype(np.int16)
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp:
path = Path(tmp.name)
with wave.open(str(path), "wb") as wav:
wav.setnchannels(self.channels)
wav.setsampwidth(2)
wav.setframerate(TARGET_SAMPLE_RATE)
wav.writeframes(pcm.tobytes())
return path
def _open_input_stream(self) -> sd.InputStream:
try:
self.input_sample_rate = TARGET_SAMPLE_RATE
return sd.InputStream(
samplerate=TARGET_SAMPLE_RATE,
channels=self.channels,
dtype="float32",
callback=self._on_audio,
)
except Exception:
self.input_sample_rate = self.default_sample_rate
return sd.InputStream(
samplerate=self.default_sample_rate,
channels=self.channels,
dtype="float32",
callback=self._on_audio,
)
def _resample_to_target(self, audio: np.ndarray) -> np.ndarray:
if self.input_sample_rate == TARGET_SAMPLE_RATE or len(audio) == 0:
return audio
source_length = len(audio)
target_length = int(source_length * TARGET_SAMPLE_RATE / self.input_sample_rate)
source_positions = np.arange(source_length)
target_positions = np.linspace(0, source_length - 1, target_length)
resampled = np.interp(target_positions, source_positions, audio[:, 0])
return resampled.reshape(-1, 1).astype(np.float32)
def _show_recording_indicator(self) -> None:
print("録音中", end="", flush=True)
while not self.indicator_stop.wait(0.25):
print(".", end="", flush=True)
class RawTerminal:
def __init__(self) -> None:
self.fd: int | None = None
self.original_settings: list[int] | None = None
def __enter__(self):
if not sys.stdin.isatty():
return self
self.fd = sys.stdin.fileno()
self.original_settings = termios.tcgetattr(self.fd)
tty.setcbreak(self.fd)
return self
def __exit__(self, _exc_type, _exc, _tb) -> None:
if self.fd is not None and self.original_settings is not None:
termios.tcsetattr(self.fd, termios.TCSADRAIN, self.original_settings)
def read_key(self, timeout_seconds: float) -> str | None:
if self.fd is None:
return None
readable, _, _ = select.select([sys.stdin], [], [], timeout_seconds)
if not readable:
return None
return sys.stdin.read(1)
def transcribe(client: Groq, recording: Recording, model: str) -> None:
transfer_bytes = recording.path.stat().st_size
print(
f"文字起こし中... 音声 {recording.duration_seconds:.2f} 秒 / 転送 {transfer_bytes} bytes",
flush=True,
)
try:
with recording.path.open("rb") as audio_file:
started_at = time.perf_counter()
result = client.audio.transcriptions.create(
file=audio_file,
model=model,
language=LANGUAGE,
response_format="json",
temperature=0,
)
elapsed_ms = (time.perf_counter() - started_at) * 1000
print(f"認識時間: {elapsed_ms:.0f} ms", flush=True)
print(f"\n認識結果: {result.text}\n", flush=True)
except Exception as exc:
print(f"文字起こしに失敗しました: {exc}", flush=True)
finally:
try:
recording.path.unlink()
except FileNotFoundError:
pass
def main() -> None:
load_dotenv()
if not os.getenv("GROQ_API_KEY"):
raise SystemExit(".env に GROQ_API_KEY を設定してください")
model = os.getenv("GROQ_STT_MODEL", DEFAULT_MODEL)
client = Groq()
recorder = PushToTalkRecorder()
print("設定:")
print(f"- model: {model}")
print(f"- language: {LANGUAGE}")
print(f"- target sample rate: {TARGET_SAMPLE_RATE} Hz mono")
print(f"- input device: {recorder.device_name}")
print(f"- input default sample rate: {recorder.default_sample_rate} Hz")
print("このターミナルにフォーカスした状態で Space を押して録音します。Esc で終了します。", flush=True)
def transcribe_in_background(recording: Recording) -> None:
recorder.set_transcribing(True)
def worker() -> None:
try:
transcribe(client, recording, model)
finally:
recorder.set_transcribing(False)
threading.Thread(target=worker, daemon=True).start()
with RawTerminal() as terminal:
while True:
key = terminal.read_key(timeout_seconds=0.1)
if key == "\x1b":
break
if key != " ":
continue
recorder.start()
while True:
key = terminal.read_key(timeout_seconds=SPACE_RELEASE_TIMEOUT_SECONDS)
if key == "\x1b":
recording = recorder.stop()
if recording is not None:
transcribe_in_background(recording)
return
if key == " ":
continue
recording = recorder.stop()
if recording is not None:
transcribe_in_background(recording)
break
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment