Skip to content

Instantly share code, notes, and snippets.

@kanzure
Created September 17, 2025 13:59
Show Gist options
  • Select an option

  • Save kanzure/27d165dcaba026600304355e02d46e97 to your computer and use it in GitHub Desktop.

Select an option

Save kanzure/27d165dcaba026600304355e02d46e97 to your computer and use it in GitHub Desktop.
bash script for speech-to-text using Whisper on Groq, includes chunking and audio device support
#!/usr/bin/env -S uv run --script --with=groq --with=pydub --with=soundfile --with=sounddevice
# originally from aider's voice.py
# and also https://raw.githubusercontent.com/groq/groq-api-cookbook/refs/heads/main/tutorials/audio-chunking/audio_chunking_code.py
import argparse
import json
import math
import os
import queue
import re
import subprocess
import tempfile
import time
import warnings
from datetime import datetime
from pathlib import Path
from groq import Groq, RateLimitError
warnings.filterwarnings(
"ignore", message="Couldn't find ffmpeg or avconv - defaulting to ffmpeg, but may not work"
)
warnings.filterwarnings("ignore", category=SyntaxWarning)
from pydub import AudioSegment # noqa
from pydub.exceptions import CouldntDecodeError, CouldntEncodeError # noqa
try:
import soundfile as sf
except (OSError, ModuleNotFoundError):
sf = None
class SoundDeviceError(Exception):
pass
class Voice:
max_rms = 0
min_rms = 1e5
pct = 0
threshold = 0.15
def __init__(self, audio_format="wav", device_name=None):
if sf is None:
raise SoundDeviceError
try:
#print("Initializing sound device...")
import sounddevice as sd
self.sd = sd
devices = sd.query_devices()
if device_name:
# Find the device with matching name
device_id = None
for i, device in enumerate(devices):
if device_name in device["name"]:
device_id = i
break
if device_id is None:
available_inputs = [d["name"] for d in devices if d["max_input_channels"] > 0]
raise ValueError(
f"Device '{device_name}' not found. Available input devices:"
f" {available_inputs}"
)
print(f"Using input device: {device_name} (ID: {device_id})")
self.device_id = device_id
else:
self.device_id = None
except (OSError, ModuleNotFoundError):
raise SoundDeviceError
if audio_format not in ["wav", "mp3", "webm"]:
raise ValueError(f"Unsupported audio format: {audio_format}")
self.audio_format = audio_format
def callback(self, indata, frames, time, status):
"""This is called (from a separate thread) for each audio block."""
import numpy as np
rms = np.sqrt(np.mean(indata**2))
self.max_rms = max(self.max_rms, rms)
self.min_rms = min(self.min_rms, rms)
rng = self.max_rms - self.min_rms
if rng > 0.001:
self.pct = (rms - self.min_rms) / rng
else:
self.pct = 0.5
self.q.put(indata.copy())
def get_prompt(self):
num = 10
if math.isnan(self.pct) or self.pct < self.threshold:
cnt = 0
else:
cnt = int(self.pct * 10)
bar = "░" * cnt + "█" * (num - cnt)
bar = bar[:num]
dur = time.time() - self.start_time
#prompt = f"Recording, press ENTER to send to Groq when done... {dur:.1f}sec {bar}"
prompt = f"Recording, press ENTER to send to Groq when done..."
input(prompt)
print("\n")
def record_and_transcribe(self, history=None, language=None):
try:
return self.raw_record_and_transcribe(history, language)
except KeyboardInterrupt:
return
except SoundDeviceError as e:
print(f"Error: {e}")
print("Please ensure you have a working audio input device connected and try again.")
return
def raw_record_and_transcribe(self, history, language):
self.q = queue.Queue()
temp_wav = tempfile.mktemp(suffix=".wav")
try:
sample_rate = int(self.sd.query_devices(self.device_id, "input")["default_samplerate"])
except (TypeError, ValueError):
sample_rate = 16000 # fallback to 16kHz if unable to query device
except self.sd.PortAudioError:
raise SoundDeviceError(
"No audio input device detected. Please check your audio settings and try again."
)
self.start_time = time.time()
try:
with self.sd.InputStream(
samplerate=sample_rate, channels=1, callback=self.callback, device=self.device_id
):
#prompt(self.get_prompt, refresh_interval=0.1)
self.get_prompt()
except self.sd.PortAudioError as err:
raise SoundDeviceError(f"Error accessing audio input device: {err}")
with sf.SoundFile(temp_wav, mode="x", samplerate=sample_rate, channels=1) as file:
while not self.q.empty():
file.write(self.q.get())
use_audio_format = self.audio_format
# Check file size and offer to convert to mp3 if too large
file_size = os.path.getsize(temp_wav)
if file_size > 24.9 * 1024 * 1024 and self.audio_format == "wav":
print("\nWarning: {temp_wav} is too large, switching to mp3 format.")
use_audio_format = "mp3"
filename = temp_wav
if use_audio_format != "wav":
try:
new_filename = tempfile.mktemp(suffix=f".{use_audio_format}")
audio = AudioSegment.from_wav(temp_wav)
audio.export(new_filename, format=use_audio_format)
os.remove(temp_wav)
filename = new_filename
except (CouldntDecodeError, CouldntEncodeError) as e:
print(f"Error converting audio: {e}")
except (OSError, FileNotFoundError) as e:
print(f"File system error during conversion: {e}")
except Exception as e:
print(f"Unexpected error during audio conversion: {e}")
# Check if recorded file is over 10MB and use chunking if so
final_file_size = os.path.getsize(filename)
if final_file_size > 10 * 1024 * 1024: # 10MB
print(f"Recorded file size ({final_file_size / (1024*1024):.1f}MB) exceeds 10MB, using chunked transcription...")
result = self.transcribe_audio_in_chunks(Path(filename))
# Clean up temp file
if filename != temp_wav:
os.remove(filename)
return result.get("text", "") if isinstance(result, dict) else result
with open(filename, "rb") as fh:
try:
# Initialize the Groq client
client = Groq(api_key=os.getenv("GROQ_API_KEY"))
# Create a transcription of the audio file
transcript = client.audio.transcriptions.create(
file=fh,
model="whisper-large-v3-turbo",
language=language
)
except Exception as err:
print(f"Unable to transcribe {filename}: {err}")
return
if filename != temp_wav:
os.remove(filename)
text = transcript.text
return text
def transcribe_file(self, filename, language=None):
"""Transcribe an existing audio file."""
if not os.path.exists(filename):
raise FileNotFoundError(f"Audio file not found: {filename}")
# Check file size - if over 10MB, use chunking
file_size = os.path.getsize(filename)
if file_size > 10 * 1024 * 1024: # 10MB
print(f"File size ({file_size / (1024*1024):.1f}MB) exceeds 10MB, using chunked transcription...")
result = self.transcribe_audio_in_chunks(Path(filename))
return result.get("text", "") if isinstance(result, dict) else result
with open(filename, "rb") as fh:
try:
# Initialize the Groq client
client = Groq(api_key=os.getenv("GROQ_API_KEY"))
# Create a transcription of the audio file
transcript = client.audio.transcriptions.create(
file=fh,
model="whisper-large-v3-turbo",
language=language
)
except Exception as err:
print(f"Unable to transcribe {filename}: {err}")
return
return transcript.text
def preprocess_audio(self, input_path: Path) -> Path:
"""
Preprocess audio file to 16kHz mono FLAC using ffmpeg.
FLAC provides lossless compression for faster upload times.
"""
if not input_path.exists():
raise FileNotFoundError(f"Input file not found: {input_path}")
with tempfile.NamedTemporaryFile(suffix='.flac', delete=False) as temp_file:
output_path = Path(temp_file.name)
print("Converting audio to 16kHz mono FLAC...")
try:
subprocess.run([
'ffmpeg',
'-hide_banner',
'-loglevel', 'error',
'-i', input_path,
'-ar', '16000',
'-ac', '1',
'-c:a', 'flac',
'-y',
output_path
], check=True)
return output_path
except subprocess.CalledProcessError as e:
output_path.unlink(missing_ok=True)
raise RuntimeError(f"FFmpeg conversion failed: {e.stderr}")
def transcribe_single_chunk(self, client: Groq, chunk: AudioSegment, chunk_num: int, total_chunks: int) -> tuple[dict, float]:
"""
Transcribe a single audio chunk with Groq API.
"""
total_api_time = 0
while True:
with tempfile.NamedTemporaryFile(suffix='.flac') as temp_file:
chunk.export(temp_file.name, format='flac')
start_time = time.time()
try:
result = client.audio.transcriptions.create(
file=("chunk.flac", temp_file, "audio/flac"),
model="whisper-large-v3-turbo",
language="en",
response_format="verbose_json"
)
api_time = time.time() - start_time
total_api_time += api_time
print(f"Chunk {chunk_num}/{total_chunks} processed in {api_time:.2f}s")
return result, total_api_time
except RateLimitError as e:
print(f"\nRate limit hit for chunk {chunk_num} - retrying in 60 seconds...")
time.sleep(60)
continue
except Exception as e:
print(f"Error transcribing chunk {chunk_num}: {str(e)}")
raise
def find_longest_common_sequence(self, sequences: list[str], match_by_words: bool = True) -> str:
"""
Find the optimal alignment between sequences with longest common sequence and sliding window matching.
"""
if not sequences:
return ""
# Convert input based on matching strategy
if match_by_words:
sequences = [
[word for word in re.split(r'(\s+\w+)', seq) if word]
for seq in sequences
]
else:
sequences = [list(seq) for seq in sequences]
left_sequence = sequences[0]
left_length = len(left_sequence)
total_sequence = []
for right_sequence in sequences[1:]:
max_matching = 0.0
right_length = len(right_sequence)
max_indices = (left_length, left_length, 0, 0)
# Try different alignments
for i in range(1, left_length + right_length + 1):
# Add epsilon to favor longer matches
eps = float(i) / 10000.0
left_start = max(0, left_length - i)
left_stop = min(left_length, left_length + right_length - i)
left = left_sequence[left_start:left_stop]
right_start = max(0, i - left_length)
right_stop = min(right_length, i)
right = right_sequence[right_start:right_stop]
if len(left) != len(right):
raise RuntimeError(
"Mismatched subsequences detected during transcript merging."
)
matches = sum(a == b for a, b in zip(left, right))
# Normalize matches by position and add epsilon
matching = matches / float(i) + eps
# Require at least 2 matches
if matches > 1 and matching > max_matching:
max_matching = matching
max_indices = (left_start, left_stop, right_start, right_stop)
# Use the best alignment found
left_start, left_stop, right_start, right_stop = max_indices
# Take left half from left sequence and right half from right sequence
left_mid = (left_stop + left_start) // 2
right_mid = (right_stop + right_start) // 2
total_sequence.extend(left_sequence[:left_mid])
left_sequence = right_sequence[right_mid:]
left_length = len(left_sequence)
# Add remaining sequence
total_sequence.extend(left_sequence)
# Join back into text
if match_by_words:
return ''.join(total_sequence)
return ''.join(total_sequence)
def merge_transcripts(self, results: list[tuple[dict, int]]) -> dict:
"""
Merge transcription chunks and handle overlaps.
"""
print("\nMerging results...")
# First, check if we have segments in our results
has_segments = False
for chunk, _ in results:
data = chunk.model_dump() if hasattr(chunk, 'model_dump') else chunk
if 'segments' in data and data['segments'] is not None and len(data['segments']) > 0:
has_segments = True
break
# Process word-level timestamps regardless of segment presence
has_words = False
words = []
for chunk, chunk_start_ms in results:
# Convert Pydantic model to dict
data = chunk.model_dump() if hasattr(chunk, 'model_dump') else chunk
# Process word timestamps if available
if isinstance(data, dict) and 'words' in data and data['words'] is not None and len(data['words']) > 0:
has_words = True
# Adjust word timestamps based on chunk start time
chunk_words = data['words']
for word in chunk_words:
# Convert chunk_start_ms from milliseconds to seconds for word timestamp adjustment
word['start'] = word['start'] + (chunk_start_ms / 1000)
word['end'] = word['end'] + (chunk_start_ms / 1000)
words.extend(chunk_words)
elif hasattr(chunk, 'words') and getattr(chunk, 'words') is not None:
has_words = True
# Handle Pydantic model for words
chunk_words = getattr(chunk, 'words')
processed_words = []
for word in chunk_words:
if hasattr(word, 'model_dump'):
word_dict = word.model_dump()
else:
# Create a dict from the word object
word_dict = {
'word': getattr(word, 'word', ''),
'start': getattr(word, 'start', 0) + (chunk_start_ms / 1000),
'end': getattr(word, 'end', 0) + (chunk_start_ms / 1000)
}
processed_words.append(word_dict)
words.extend(processed_words)
# If we don't have segments, just merge the full texts
if not has_segments:
print("No segments found in transcription results. Merging full texts only.")
texts = []
for chunk, _ in results:
# Convert Pydantic model to dict
data = chunk.model_dump() if hasattr(chunk, 'model_dump') else chunk
# Get text - handle both dictionary and object access
if isinstance(data, dict):
text = data.get('text', '')
else:
# For Pydantic models or other objects
text = getattr(chunk, 'text', '')
texts.append(text)
merged_text = " ".join(texts)
result = {"text": merged_text}
# Include word-level timestamps if available
if has_words:
result["words"] = words
# Return an empty segments list since segments weren't requested
result["segments"] = []
return result
# If we do have segments, proceed with the segment merging logic
print("Merging segments across chunks...")
final_segments = []
processed_chunks = []
for i, (chunk, chunk_start_ms) in enumerate(results):
data = chunk.model_dump() if hasattr(chunk, 'model_dump') else chunk
# Handle both dictionary and object access for segments
if isinstance(data, dict):
segments = data.get('segments', [])
else:
segments = getattr(chunk, 'segments', [])
# Convert segments to list of dicts if needed
if hasattr(segments, 'model_dump'):
segments = segments.model_dump()
elif not isinstance(segments, list):
segments = []
# If not last chunk, find next chunk start time
if i < len(results) - 1:
next_start = results[i + 1][1] # This is in milliseconds
# Split segments into current and overlap based on next chunk's start time
current_segments = []
overlap_segments = []
for segment in segments:
# Handle both dict and object access for segment
if isinstance(segment, dict):
segment_end = segment['end']
else:
segment_end = getattr(segment, 'end', 0)
# Convert segment end time to ms and compare with next chunk start time
if segment_end * 1000 > next_start:
# Make sure segment is a dict
if not isinstance(segment, dict) and hasattr(segment, 'model_dump'):
segment = segment.model_dump()
elif not isinstance(segment, dict):
# Create a dict from the segment object
segment = {
'text': getattr(segment, 'text', ''),
'start': getattr(segment, 'start', 0),
'end': segment_end
}
overlap_segments.append(segment)
else:
# Make sure segment is a dict
if not isinstance(segment, dict) and hasattr(segment, 'model_dump'):
segment = segment.model_dump()
elif not isinstance(segment, dict):
# Create a dict from the segment object
segment = {
'text': getattr(segment, 'text', ''),
'start': getattr(segment, 'start', 0),
'end': segment_end
}
current_segments.append(segment)
# Merge overlap segments if any exist
if overlap_segments:
merged_overlap = overlap_segments[0].copy()
merged_overlap.update({
'text': ' '.join(s.get('text', '') if isinstance(s, dict) else getattr(s, 'text', '')
for s in overlap_segments),
'end': overlap_segments[-1].get('end', 0) if isinstance(overlap_segments[-1], dict)
else getattr(overlap_segments[-1], 'end', 0)
})
current_segments.append(merged_overlap)
processed_chunks.append(current_segments)
else:
# For last chunk, ensure all segments are dicts
dict_segments = []
for segment in segments:
if not isinstance(segment, dict) and hasattr(segment, 'model_dump'):
dict_segments.append(segment.model_dump())
elif not isinstance(segment, dict):
dict_segments.append({
'text': getattr(segment, 'text', ''),
'start': getattr(segment, 'start', 0),
'end': getattr(segment, 'end', 0)
})
else:
dict_segments.append(segment)
processed_chunks.append(dict_segments)
# Merge boundaries between chunks
for i in range(len(processed_chunks) - 1):
# Skip if either chunk has no segments
if not processed_chunks[i] or not processed_chunks[i+1]:
continue
# Add all segments except last from current chunk
if len(processed_chunks[i]) > 1:
final_segments.extend(processed_chunks[i][:-1])
# Merge boundary segments
last_segment = processed_chunks[i][-1]
first_segment = processed_chunks[i+1][0]
merged_text = self.find_longest_common_sequence([
last_segment.get('text', '') if isinstance(last_segment, dict) else getattr(last_segment, 'text', ''),
first_segment.get('text', '') if isinstance(first_segment, dict) else getattr(first_segment, 'text', '')
])
merged_segment = last_segment.copy() if isinstance(last_segment, dict) else {
'text': getattr(last_segment, 'text', ''),
'start': getattr(last_segment, 'start', 0),
'end': getattr(last_segment, 'end', 0)
}
merged_segment.update({
'text': merged_text,
'end': first_segment.get('end', 0) if isinstance(first_segment, dict) else getattr(first_segment, 'end', 0)
})
final_segments.append(merged_segment)
# Add all segments from last chunk
if processed_chunks and processed_chunks[-1]:
final_segments.extend(processed_chunks[-1])
# Create final transcription
final_text = ' '.join(
segment.get('text', '') if isinstance(segment, dict) else getattr(segment, 'text', '')
for segment in final_segments
)
# Create result with both segments and words (if available)
result = {
"text": final_text,
"segments": final_segments
}
# Include word-level timestamps if available
if has_words:
result["words"] = words
return result
def save_results(self, result: dict, audio_path: Path) -> Path:
"""
Save transcription results to files.
"""
try:
output_dir = Path("transcriptions")
output_dir.mkdir(exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
base_path = output_dir / f"{Path(audio_path).stem}_{timestamp}"
# Save results in different formats
with open(f"{base_path}.txt", 'w', encoding='utf-8') as f:
f.write(result["text"])
with open(f"{base_path}_full.json", 'w', encoding='utf-8') as f:
json.dump(result, f, indent=2, ensure_ascii=False)
with open(f"{base_path}_segments.json", 'w', encoding='utf-8') as f:
json.dump(result["segments"], f, indent=2, ensure_ascii=False)
print(f"\nResults saved to transcriptions folder:")
print(f"- {base_path}.txt")
print(f"- {base_path}_full.json")
print(f"- {base_path}_segments.json")
return base_path
except IOError as e:
print(f"Error saving results: {str(e)}")
raise
def transcribe_audio_in_chunks(self, audio_path: Path, chunk_length: int = 600, overlap: int = 10) -> dict:
"""
Transcribe audio in chunks with overlap with Whisper via Groq API.
"""
api_key = os.getenv("GROQ_API_KEY")
if not api_key:
raise ValueError("GROQ_API_KEY environment variable not set")
print(f"\nStarting chunked transcription of: {audio_path}")
client = Groq(api_key=api_key, max_retries=0)
processed_path = None
try:
# Preprocess audio and get basic info
processed_path = self.preprocess_audio(audio_path)
try:
audio = AudioSegment.from_file(processed_path, format="flac")
except Exception as e:
raise RuntimeError(f"Failed to load audio: {str(e)}")
duration = len(audio)
print(f"Audio duration: {duration/1000:.2f}s")
# Calculate # of chunks
chunk_ms = chunk_length * 1000
overlap_ms = overlap * 1000
total_chunks = (duration // (chunk_ms - overlap_ms)) + 1
print(f"Processing {total_chunks} chunks...")
results = []
total_transcription_time = 0
# Loop through each chunk, extract current chunk from audio, transcribe
for i in range(total_chunks):
start = i * (chunk_ms - overlap_ms)
end = min(start + chunk_ms, duration)
print(f"\nProcessing chunk {i+1}/{total_chunks}")
print(f"Time range: {start/1000:.1f}s - {end/1000:.1f}s")
chunk = audio[start:end]
result, chunk_time = self.transcribe_single_chunk(client, chunk, i+1, total_chunks)
total_transcription_time += chunk_time
results.append((result, start))
final_result = self.merge_transcripts(results)
self.save_results(final_result, audio_path)
print(f"\nTotal Groq API transcription time: {total_transcription_time:.2f}s")
return final_result
# Clean up temp files regardless of successful creation
finally:
if processed_path:
Path(processed_path).unlink(missing_ok=True)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Voice transcription using Groq Whisper")
parser.add_argument("filename", nargs="?", help="Audio file to transcribe (optional)")
parser.add_argument("--language", help="Language code for transcription")
parser.add_argument("--device", help="Audio input device name")
args = parser.parse_args()
api_key = os.getenv("GROQ_API_KEY")
if not api_key:
raise ValueError("Please set the GROQ_API_KEY environment variable.")
voice = Voice(device_name=args.device)
if args.filename:
# Transcribe the provided file
result = voice.transcribe_file(args.filename, language=args.language)
else:
# Record and transcribe from microphone
result = voice.record_and_transcribe(language=args.language)
if result:
print(result)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment