Skip to content

Instantly share code, notes, and snippets.

@alber70g
Last active August 4, 2025 16:11
Show Gist options
  • Save alber70g/c13d116394f5da313d1b285d80d29599 to your computer and use it in GitHub Desktop.
Save alber70g/c13d116394f5da313d1b285d80d29599 to your computer and use it in GitHub Desktop.
ollama run --verbose hf.co/unsloth/Qwen3-30B-A3B-Instruct-2507-GGUF:Q4_K_M
"""
This is the library that is been used. We could switch to streaming?
parakeet_mlx/parakeet.py (this is the main interface, not the detailed implementation):
```
class BaseParakeet(nn.Module):
def __init__(self, preprocess_args: PreprocessArgs, encoder_args: ConformerArgs):
super().__init__()
self.preprocessor_config = preprocess_args
self.encoder_config = encoder_args
self.encoder = Conformer(encoder_args)
def transcribe(
self,
path: Path | str,
*,
dtype: mx.Dtype = mx.bfloat16,
chunk_duration: Optional[float] = None,
overlap_duration: float = 15.0,
chunk_callback: Optional[Callable] = None,
) -> AlignedResult:
def transcribe_stream(
self,
context_size: tuple[int, int] = (256, 256),
depth=1,
*,
keep_original_attention: bool = False,
decoding_config: DecodingConfig = DecodingConfig(),
) -> "StreamingParakeet":
return StreamingParakeet(
self,
context_size,
depth,
decoding_config=decoding_config,
keep_original_attention=keep_original_attention,
)
# streaming
class StreamingParakeet:
model: "BaseParakeet"
cache: List[ConformerCache]
audio_buffer: mx.array
mel_buffer: Optional[mx.array]
decoder_hidden: Optional[tuple[mx.array, mx.array]] = None
last_token: Optional[int] = None
finalized_tokens: list[AlignedToken]
draft_tokens: list[AlignedToken]
context_size: tuple[int, int]
depth: int
decoding_config: DecodingConfig
keep_original_attention: bool = False
def __init__(
self,
model: "BaseParakeet",
context_size: tuple[int, int],
depth: int = 1,
*,
keep_original_attention: bool = False,
decoding_config: DecodingConfig = DecodingConfig(),
) -> None:
self.context_size = context_size
self.depth = depth
self.decoding_config = decoding_config
self.keep_original_attention = keep_original_attention
self.model = model
self.cache = [
RotatingConformerCache(self.keep_size, cache_drop_size=self.drop_size)
for _ in range(len(model.encoder.layers))
]
self.audio_buffer = mx.array([])
self.mel_buffer = None
self.finalized_tokens = []
self.draft_tokens = []
def __enter__(self):
if not self.keep_original_attention:
self.model.encoder.set_attention_model(
"rel_pos_local_attn", self.context_size
)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if not self.keep_original_attention:
self.model.encoder.set_attention_model(
"rel_pos"
) # hard-coded; might cache if there's actually new varient than rel_pos
del self.audio_buffer
del self.cache
mx.clear_cache()
@property
def keep_size(self):
return self.context_size[0]
@property
def drop_size(self):
return self.context_size[1] * self.depth
@property
def result(self) -> AlignedResult:
return sentences_to_result(
tokens_to_sentences(self.finalized_tokens + self.draft_tokens)
)
def add_audio(self, audio: mx.array) -> None:
```
main.py:
```
from transcriber import ParakeetTranscriber
from translator import EnglishDutchTranslator
import time
import argparse
import sounddevice as sd
import numpy as np
import threading
import queue
SAMPLE_RATE = 16000 # Hz
CHUNK_DURATION = 20.0 # Seconds for each audio segment
OVERLAP_DURATION = 6.6 # Overlap between segments (seconds)
SILENCE_VOLUME_PERCENTAGE = 0.2 # Silence threshold
DEBUG = True
MODEL_ID = "mlx-community/parakeet-tdt-0.6b-v2"
GLOBAL_START_TIME = time.time()
parser = argparse.ArgumentParser()
parser.add_argument("--device", type=str, default="default", help="Device to use (e.g., default)")
args = parser.parse_args()
DEVICE = args.device
print("⚙️ Loading Parakeet-MLX model...")
transcriber = ParakeetTranscriber(model_id=MODEL_ID, debug=DEBUG)
print("✅ Transcriber initialized!")
print("⚙️ Loading Dutch translation model...")
translator = EnglishDutchTranslator(debug=DEBUG)
print("✅ Translator initialized!")
transcribe_time = 0
translate_time = 0
audio_buffer = np.array([], dtype=np.float32)
buffer_lock = threading.Lock()
audio_queue = queue.Queue()
running = True
last_sentence_start_time = 0
last_sentence_end_time = 0
def audio_callback(indata, frames, time, status):
if status:
print(f"Audio status: {status}")
# Append new audio data to buffer
with buffer_lock:
global audio_buffer
audio_buffer = np.append(audio_buffer, indata[:, 0])
def process_audio():
global audio_buffer, running
# Calculate sizes in samples
chunk_size = int(CHUNK_DURATION * SAMPLE_RATE)
overlap_size = int(OVERLAP_DURATION * SAMPLE_RATE)
step_size = chunk_size - overlap_size
if step_size <= 0:
raise ValueError("Invalid chunk/overlap configuration")
while running:
with buffer_lock:
if len(audio_buffer) < chunk_size:
continue
# Extract chunk
segment = audio_buffer[:chunk_size].copy()
# Advance buffer (keep overlap for next chunk)
audio_buffer = audio_buffer[step_size:]
buffer_end_time = time.time() - GLOBAL_START_TIME
# Submit for processing
handle_segment(segment, end_time=buffer_end_time)
def on_translation_complete(dutch_text, error):
if error:
print(f"Error translating text: {error}")
else:
global translate_time
current_time = time.time()
print(f"(translate {current_time - translate_time:.2f}s) Target: {dutch_text}")
# Send async POST to localhost:5353/publish
import requests
try:
requests.post(
"http://localhost:5353/publish",
json={"text": dutch_text, "timestamp": current_time}
)
except Exception as e:
print(f"POST request failed: {e}")
def create_transcription_callback(start_time, end_time):
print(f"Creating callback for segment from {start_time:.2f} to {end_time:.2f}")
def on_transcription_complete(results, error):
if error:
print(f"Error processing audio: {error}")
return
global transcribe_time, translate_time, last_sentence_end_time
current_time = time.time()
# Validate results
if not results or not results[0]:
print("No transcription result received.")
return
result = results[0]
# Print debug info
if result.sentences:
abs_start = start_time + result.sentences[0].start
print(f"Transcribed sentences ({abs_start:.2f}-{end_time:.2f}):")
for s in result.sentences:
print(f" {start_time + s.start:.2f}: {s.text}")
# Filter sentences: start < (end_time - OVERLAP_DURATION)
threshold = end_time - OVERLAP_DURATION
sentences = []
for sentence in result.sentences:
abs_start = start_time + sentence.start
if abs_start >= threshold:
break # Stop at first sentence in overlap region
sentences.append(sentence)
print(f"[{len(sentences)} sentences extracted]")
# Build text output and update global end_time
text = ""
if sentences:
last_sentence = sentences[-1]
last_sentence_end_time = start_time + last_sentence.end
for s in sentences:
text += f"{start_time + s.start:.2f}: {s.text}\n"
print(f"(transcribe {current_time - transcribe_time:.2f}s) Source:\n{text}")
# Uncomment to trigger translation
# if text.strip():
# transcribe_time = current_time
# translate_time = current_time
# translator.translate_async(text, callback=on_translation_complete)
return on_transcription_complete
def handle_segment(audio_data, end_time):
global transcribe_time
transcribe_time = time.time()
start_time = end_time - CHUNK_DURATION
# Use the transcriber's async API
transcriber.transcribe_async(audio_data,
callback=create_transcription_callback(start_time, end_time))
# Start audio processing thread
processing_thread = threading.Thread(target=process_audio, daemon=True)
processing_thread.start()
# Start audio input stream
stream = sd.InputStream(
samplerate=SAMPLE_RATE,
channels=1,
callback=audio_callback,
# device=DEVICE
)
print("🎤 Starting audio capture...")
with stream:
try:
while running:
time.sleep(0.1)
except KeyboardInterrupt:
print("Shutting down...")
running = False
processing_thread.join(timeout=1.0)
transcriber.shutdown()
print("All services stopped.")
```
transcriber/transcriber.py:
```
import traceback
from typing import Union, Optional, List, Dict, Any
import numpy as np
import mlx.core as mx
from concurrent.futures import ThreadPoolExecutor
from parakeet_mlx import from_pretrained
from parakeet_mlx.audio import get_logmel
DEBUG = False # Set to True for debug output
class ParakeetTranscriber:
def __init__(
self,
model_id: str = "mlx-community/parakeet-tdt-0.6b-v2",
dtype: mx.Dtype = mx.bfloat16,
max_workers: int = 3,
debug=False,
):
self.model_id = model_id
self.dtype = dtype
self.model = None
self.executor = ThreadPoolExecutor(max_workers=max_workers)
self.sample_rate = 16000 # Parakeet expects 16kHz audio
global DEBUG
DEBUG = debug
def load_model(self) -> None:
if self.model is None:
self.model = from_pretrained(self.model_id, dtype=self.dtype)
def transcribe(self, audio: Union[bytes, np.ndarray, mx.array]):
if len(audio) == 0:
if DEBUG:
print("Received empty audio data, returning empty transcription.")
return ""
self.load_model()
audio_mx = self._prepare_audio(audio)
# Process audio with the model
mel = get_logmel(audio_mx, self.model.preprocessor_config)
mx.eval(mel)
# Generate transcription
results = self.model.generate(mel)
# if DEBUG:
# print(f"Transcription results: {results}")
return results
def transcribe_async(
self, audio: Union[bytes, np.ndarray, mx.array], callback
):
self.load_model()
def _process_and_callback():
try:
result = self.transcribe(audio)
callback(result, None)
except Exception as e:
if callback:
callback(None, e)
raise e
return self.executor.submit(_process_and_callback)
def _prepare_audio(self, audio: Union[bytes, np.ndarray, mx.array]) -> mx.array:
if isinstance(audio, bytes):
# Convert from raw bytes (assuming int16 format)
audio_np = np.frombuffer(audio, dtype=np.int16).astype(np.float32)
audio_np /= 32768.0 # Normalize to [-1, 1]
return mx.array(audio_np)
elif isinstance(audio, np.ndarray):
# If already numpy array, just ensure it's float32 normalized
if audio.dtype != np.float32:
audio = audio.astype(np.float32)
if audio.max() > 1.0 or audio.min() < -1.0:
audio /= max(abs(audio.max()), abs(audio.min()))
return mx.array(audio)
elif isinstance(audio, mx.array):
# Already MLX array, ensure it's normalized
return audio
else:
raise ValueError(f"Unsupported audio type: {type(audio)}")
def shutdown(self):
if hasattr(self, "executor"):
self.executor.shutdown(wait=False)
```
To give the best performance, we should improve this code.
I suggest we use streaming instead of processing chunks.
Rewrite the code for both main.py and transcriber/transcriber.py to use the streaming interface of the Parakeet model.
"""
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment