Created
April 29, 2026 02:42
-
-
Save GOROman/868e2007ca621fa6113fb71f6a9ba5de to your computer and use it in GitHub Desktop.
Groq - Speech to Text (STT) API のテスト
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import os | |
| import select | |
| import sys | |
| import termios | |
| import tempfile | |
| import threading | |
| import time | |
| import tty | |
| import wave | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| import numpy as np | |
| import sounddevice as sd | |
| from dotenv import load_dotenv | |
| from groq import Groq | |
| DEFAULT_MODEL = "whisper-large-v3-turbo" | |
| LANGUAGE = "ja" | |
| MIN_RECORD_SECONDS = 0.15 | |
| SPACE_RELEASE_TIMEOUT_SECONDS = 0.9 | |
| TARGET_SAMPLE_RATE = 16_000 | |
| @dataclass | |
| class Recording: | |
| path: Path | |
| duration_seconds: float | |
| class PushToTalkRecorder: | |
| def __init__(self) -> None: | |
| device_info = sd.query_devices(kind="input") | |
| self.device_name = str(device_info["name"]) | |
| self.default_sample_rate = int(device_info["default_samplerate"]) | |
| self.input_sample_rate = TARGET_SAMPLE_RATE | |
| self.channels = 1 | |
| self.frames: list[np.ndarray] = [] | |
| self.stream: sd.InputStream | None = None | |
| self.lock = threading.Lock() | |
| self.is_recording = False | |
| self.is_transcribing = False | |
| self.indicator_stop = threading.Event() | |
| self.indicator_thread: threading.Thread | None = None | |
| def start(self) -> None: | |
| with self.lock: | |
| if self.is_recording or self.is_transcribing: | |
| return | |
| self.frames = [] | |
| self.stream = self._open_input_stream() | |
| self.stream.start() | |
| self.is_recording = True | |
| self.indicator_stop.clear() | |
| self.indicator_thread = threading.Thread( | |
| target=self._show_recording_indicator, | |
| daemon=True, | |
| ) | |
| self.indicator_thread.start() | |
| if self.input_sample_rate == TARGET_SAMPLE_RATE: | |
| print(f"録音設定: {self.input_sample_rate} Hz mono", flush=True) | |
| else: | |
| print( | |
| f"録音設定: {self.input_sample_rate} Hz mono -> {TARGET_SAMPLE_RATE} Hz mono", | |
| flush=True, | |
| ) | |
| def stop(self) -> Recording | None: | |
| with self.lock: | |
| if not self.is_recording: | |
| return None | |
| stream = self.stream | |
| indicator_thread = self.indicator_thread | |
| self.stream = None | |
| self.indicator_thread = None | |
| self.is_recording = False | |
| self.indicator_stop.set() | |
| if stream is not None: | |
| stream.stop() | |
| stream.close() | |
| if indicator_thread is not None: | |
| indicator_thread.join() | |
| print("", flush=True) | |
| audio = self._joined_audio() | |
| duration_seconds = len(audio) / self.input_sample_rate | |
| if duration_seconds < MIN_RECORD_SECONDS: | |
| print("録音が短すぎるため送信しません", flush=True) | |
| return None | |
| audio = self._resample_to_target(audio) | |
| path = self._write_wav(audio) | |
| return Recording(path=path, duration_seconds=duration_seconds) | |
| def set_transcribing(self, value: bool) -> None: | |
| with self.lock: | |
| self.is_transcribing = value | |
| def _on_audio(self, indata: np.ndarray, _frames: int, _time, status) -> None: | |
| if status: | |
| print(f"録音警告: {status}", flush=True) | |
| with self.lock: | |
| if self.is_recording: | |
| self.frames.append(indata.copy()) | |
| def _joined_audio(self) -> np.ndarray: | |
| with self.lock: | |
| frames = list(self.frames) | |
| self.frames = [] | |
| if not frames: | |
| return np.empty((0, self.channels), dtype=np.float32) | |
| return np.concatenate(frames, axis=0) | |
| def _write_wav(self, audio: np.ndarray) -> Path: | |
| pcm = np.clip(audio, -1.0, 1.0) | |
| pcm = (pcm * 32767).astype(np.int16) | |
| with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp: | |
| path = Path(tmp.name) | |
| with wave.open(str(path), "wb") as wav: | |
| wav.setnchannels(self.channels) | |
| wav.setsampwidth(2) | |
| wav.setframerate(TARGET_SAMPLE_RATE) | |
| wav.writeframes(pcm.tobytes()) | |
| return path | |
| def _open_input_stream(self) -> sd.InputStream: | |
| try: | |
| self.input_sample_rate = TARGET_SAMPLE_RATE | |
| return sd.InputStream( | |
| samplerate=TARGET_SAMPLE_RATE, | |
| channels=self.channels, | |
| dtype="float32", | |
| callback=self._on_audio, | |
| ) | |
| except Exception: | |
| self.input_sample_rate = self.default_sample_rate | |
| return sd.InputStream( | |
| samplerate=self.default_sample_rate, | |
| channels=self.channels, | |
| dtype="float32", | |
| callback=self._on_audio, | |
| ) | |
| def _resample_to_target(self, audio: np.ndarray) -> np.ndarray: | |
| if self.input_sample_rate == TARGET_SAMPLE_RATE or len(audio) == 0: | |
| return audio | |
| source_length = len(audio) | |
| target_length = int(source_length * TARGET_SAMPLE_RATE / self.input_sample_rate) | |
| source_positions = np.arange(source_length) | |
| target_positions = np.linspace(0, source_length - 1, target_length) | |
| resampled = np.interp(target_positions, source_positions, audio[:, 0]) | |
| return resampled.reshape(-1, 1).astype(np.float32) | |
| def _show_recording_indicator(self) -> None: | |
| print("録音中", end="", flush=True) | |
| while not self.indicator_stop.wait(0.25): | |
| print(".", end="", flush=True) | |
| class RawTerminal: | |
| def __init__(self) -> None: | |
| self.fd: int | None = None | |
| self.original_settings: list[int] | None = None | |
| def __enter__(self): | |
| if not sys.stdin.isatty(): | |
| return self | |
| self.fd = sys.stdin.fileno() | |
| self.original_settings = termios.tcgetattr(self.fd) | |
| tty.setcbreak(self.fd) | |
| return self | |
| def __exit__(self, _exc_type, _exc, _tb) -> None: | |
| if self.fd is not None and self.original_settings is not None: | |
| termios.tcsetattr(self.fd, termios.TCSADRAIN, self.original_settings) | |
| def read_key(self, timeout_seconds: float) -> str | None: | |
| if self.fd is None: | |
| return None | |
| readable, _, _ = select.select([sys.stdin], [], [], timeout_seconds) | |
| if not readable: | |
| return None | |
| return sys.stdin.read(1) | |
| def transcribe(client: Groq, recording: Recording, model: str) -> None: | |
| transfer_bytes = recording.path.stat().st_size | |
| print( | |
| f"文字起こし中... 音声 {recording.duration_seconds:.2f} 秒 / 転送 {transfer_bytes} bytes", | |
| flush=True, | |
| ) | |
| try: | |
| with recording.path.open("rb") as audio_file: | |
| started_at = time.perf_counter() | |
| result = client.audio.transcriptions.create( | |
| file=audio_file, | |
| model=model, | |
| language=LANGUAGE, | |
| response_format="json", | |
| temperature=0, | |
| ) | |
| elapsed_ms = (time.perf_counter() - started_at) * 1000 | |
| print(f"認識時間: {elapsed_ms:.0f} ms", flush=True) | |
| print(f"\n認識結果: {result.text}\n", flush=True) | |
| except Exception as exc: | |
| print(f"文字起こしに失敗しました: {exc}", flush=True) | |
| finally: | |
| try: | |
| recording.path.unlink() | |
| except FileNotFoundError: | |
| pass | |
| def main() -> None: | |
| load_dotenv() | |
| if not os.getenv("GROQ_API_KEY"): | |
| raise SystemExit(".env に GROQ_API_KEY を設定してください") | |
| model = os.getenv("GROQ_STT_MODEL", DEFAULT_MODEL) | |
| client = Groq() | |
| recorder = PushToTalkRecorder() | |
| print("設定:") | |
| print(f"- model: {model}") | |
| print(f"- language: {LANGUAGE}") | |
| print(f"- target sample rate: {TARGET_SAMPLE_RATE} Hz mono") | |
| print(f"- input device: {recorder.device_name}") | |
| print(f"- input default sample rate: {recorder.default_sample_rate} Hz") | |
| print("このターミナルにフォーカスした状態で Space を押して録音します。Esc で終了します。", flush=True) | |
| def transcribe_in_background(recording: Recording) -> None: | |
| recorder.set_transcribing(True) | |
| def worker() -> None: | |
| try: | |
| transcribe(client, recording, model) | |
| finally: | |
| recorder.set_transcribing(False) | |
| threading.Thread(target=worker, daemon=True).start() | |
| with RawTerminal() as terminal: | |
| while True: | |
| key = terminal.read_key(timeout_seconds=0.1) | |
| if key == "\x1b": | |
| break | |
| if key != " ": | |
| continue | |
| recorder.start() | |
| while True: | |
| key = terminal.read_key(timeout_seconds=SPACE_RELEASE_TIMEOUT_SECONDS) | |
| if key == "\x1b": | |
| recording = recorder.stop() | |
| if recording is not None: | |
| transcribe_in_background(recording) | |
| return | |
| if key == " ": | |
| continue | |
| recording = recorder.stop() | |
| if recording is not None: | |
| transcribe_in_background(recording) | |
| break | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment