Last active
August 4, 2025 16:11
-
-
Save alber70g/c13d116394f5da313d1b285d80d29599 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| ollama run --verbose hf.co/unsloth/Qwen3-30B-A3B-Instruct-2507-GGUF:Q4_K_M |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| This is the library that is been used. We could switch to streaming? | |
| parakeet_mlx/parakeet.py (this is the main interface, not the detailed implementation): | |
| ``` | |
| class BaseParakeet(nn.Module): | |
| def __init__(self, preprocess_args: PreprocessArgs, encoder_args: ConformerArgs): | |
| super().__init__() | |
| self.preprocessor_config = preprocess_args | |
| self.encoder_config = encoder_args | |
| self.encoder = Conformer(encoder_args) | |
| def transcribe( | |
| self, | |
| path: Path | str, | |
| *, | |
| dtype: mx.Dtype = mx.bfloat16, | |
| chunk_duration: Optional[float] = None, | |
| overlap_duration: float = 15.0, | |
| chunk_callback: Optional[Callable] = None, | |
| ) -> AlignedResult: | |
| def transcribe_stream( | |
| self, | |
| context_size: tuple[int, int] = (256, 256), | |
| depth=1, | |
| *, | |
| keep_original_attention: bool = False, | |
| decoding_config: DecodingConfig = DecodingConfig(), | |
| ) -> "StreamingParakeet": | |
| return StreamingParakeet( | |
| self, | |
| context_size, | |
| depth, | |
| decoding_config=decoding_config, | |
| keep_original_attention=keep_original_attention, | |
| ) | |
| # streaming | |
| class StreamingParakeet: | |
| model: "BaseParakeet" | |
| cache: List[ConformerCache] | |
| audio_buffer: mx.array | |
| mel_buffer: Optional[mx.array] | |
| decoder_hidden: Optional[tuple[mx.array, mx.array]] = None | |
| last_token: Optional[int] = None | |
| finalized_tokens: list[AlignedToken] | |
| draft_tokens: list[AlignedToken] | |
| context_size: tuple[int, int] | |
| depth: int | |
| decoding_config: DecodingConfig | |
| keep_original_attention: bool = False | |
| def __init__( | |
| self, | |
| model: "BaseParakeet", | |
| context_size: tuple[int, int], | |
| depth: int = 1, | |
| *, | |
| keep_original_attention: bool = False, | |
| decoding_config: DecodingConfig = DecodingConfig(), | |
| ) -> None: | |
| self.context_size = context_size | |
| self.depth = depth | |
| self.decoding_config = decoding_config | |
| self.keep_original_attention = keep_original_attention | |
| self.model = model | |
| self.cache = [ | |
| RotatingConformerCache(self.keep_size, cache_drop_size=self.drop_size) | |
| for _ in range(len(model.encoder.layers)) | |
| ] | |
| self.audio_buffer = mx.array([]) | |
| self.mel_buffer = None | |
| self.finalized_tokens = [] | |
| self.draft_tokens = [] | |
| def __enter__(self): | |
| if not self.keep_original_attention: | |
| self.model.encoder.set_attention_model( | |
| "rel_pos_local_attn", self.context_size | |
| ) | |
| return self | |
| def __exit__(self, exc_type, exc_val, exc_tb): | |
| if not self.keep_original_attention: | |
| self.model.encoder.set_attention_model( | |
| "rel_pos" | |
| ) # hard-coded; might cache if there's actually new varient than rel_pos | |
| del self.audio_buffer | |
| del self.cache | |
| mx.clear_cache() | |
| @property | |
| def keep_size(self): | |
| return self.context_size[0] | |
| @property | |
| def drop_size(self): | |
| return self.context_size[1] * self.depth | |
| @property | |
| def result(self) -> AlignedResult: | |
| return sentences_to_result( | |
| tokens_to_sentences(self.finalized_tokens + self.draft_tokens) | |
| ) | |
| def add_audio(self, audio: mx.array) -> None: | |
| ``` | |
| main.py: | |
| ``` | |
| from transcriber import ParakeetTranscriber | |
| from translator import EnglishDutchTranslator | |
| import time | |
| import argparse | |
| import sounddevice as sd | |
| import numpy as np | |
| import threading | |
| import queue | |
| SAMPLE_RATE = 16000 # Hz | |
| CHUNK_DURATION = 20.0 # Seconds for each audio segment | |
| OVERLAP_DURATION = 6.6 # Overlap between segments (seconds) | |
| SILENCE_VOLUME_PERCENTAGE = 0.2 # Silence threshold | |
| DEBUG = True | |
| MODEL_ID = "mlx-community/parakeet-tdt-0.6b-v2" | |
| GLOBAL_START_TIME = time.time() | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument("--device", type=str, default="default", help="Device to use (e.g., default)") | |
| args = parser.parse_args() | |
| DEVICE = args.device | |
| print("⚙️ Loading Parakeet-MLX model...") | |
| transcriber = ParakeetTranscriber(model_id=MODEL_ID, debug=DEBUG) | |
| print("✅ Transcriber initialized!") | |
| print("⚙️ Loading Dutch translation model...") | |
| translator = EnglishDutchTranslator(debug=DEBUG) | |
| print("✅ Translator initialized!") | |
| transcribe_time = 0 | |
| translate_time = 0 | |
| audio_buffer = np.array([], dtype=np.float32) | |
| buffer_lock = threading.Lock() | |
| audio_queue = queue.Queue() | |
| running = True | |
| last_sentence_start_time = 0 | |
| last_sentence_end_time = 0 | |
| def audio_callback(indata, frames, time, status): | |
| if status: | |
| print(f"Audio status: {status}") | |
| # Append new audio data to buffer | |
| with buffer_lock: | |
| global audio_buffer | |
| audio_buffer = np.append(audio_buffer, indata[:, 0]) | |
| def process_audio(): | |
| global audio_buffer, running | |
| # Calculate sizes in samples | |
| chunk_size = int(CHUNK_DURATION * SAMPLE_RATE) | |
| overlap_size = int(OVERLAP_DURATION * SAMPLE_RATE) | |
| step_size = chunk_size - overlap_size | |
| if step_size <= 0: | |
| raise ValueError("Invalid chunk/overlap configuration") | |
| while running: | |
| with buffer_lock: | |
| if len(audio_buffer) < chunk_size: | |
| continue | |
| # Extract chunk | |
| segment = audio_buffer[:chunk_size].copy() | |
| # Advance buffer (keep overlap for next chunk) | |
| audio_buffer = audio_buffer[step_size:] | |
| buffer_end_time = time.time() - GLOBAL_START_TIME | |
| # Submit for processing | |
| handle_segment(segment, end_time=buffer_end_time) | |
| def on_translation_complete(dutch_text, error): | |
| if error: | |
| print(f"Error translating text: {error}") | |
| else: | |
| global translate_time | |
| current_time = time.time() | |
| print(f"(translate {current_time - translate_time:.2f}s) Target: {dutch_text}") | |
| # Send async POST to localhost:5353/publish | |
| import requests | |
| try: | |
| requests.post( | |
| "http://localhost:5353/publish", | |
| json={"text": dutch_text, "timestamp": current_time} | |
| ) | |
| except Exception as e: | |
| print(f"POST request failed: {e}") | |
| def create_transcription_callback(start_time, end_time): | |
| print(f"Creating callback for segment from {start_time:.2f} to {end_time:.2f}") | |
| def on_transcription_complete(results, error): | |
| if error: | |
| print(f"Error processing audio: {error}") | |
| return | |
| global transcribe_time, translate_time, last_sentence_end_time | |
| current_time = time.time() | |
| # Validate results | |
| if not results or not results[0]: | |
| print("No transcription result received.") | |
| return | |
| result = results[0] | |
| # Print debug info | |
| if result.sentences: | |
| abs_start = start_time + result.sentences[0].start | |
| print(f"Transcribed sentences ({abs_start:.2f}-{end_time:.2f}):") | |
| for s in result.sentences: | |
| print(f" {start_time + s.start:.2f}: {s.text}") | |
| # Filter sentences: start < (end_time - OVERLAP_DURATION) | |
| threshold = end_time - OVERLAP_DURATION | |
| sentences = [] | |
| for sentence in result.sentences: | |
| abs_start = start_time + sentence.start | |
| if abs_start >= threshold: | |
| break # Stop at first sentence in overlap region | |
| sentences.append(sentence) | |
| print(f"[{len(sentences)} sentences extracted]") | |
| # Build text output and update global end_time | |
| text = "" | |
| if sentences: | |
| last_sentence = sentences[-1] | |
| last_sentence_end_time = start_time + last_sentence.end | |
| for s in sentences: | |
| text += f"{start_time + s.start:.2f}: {s.text}\n" | |
| print(f"(transcribe {current_time - transcribe_time:.2f}s) Source:\n{text}") | |
| # Uncomment to trigger translation | |
| # if text.strip(): | |
| # transcribe_time = current_time | |
| # translate_time = current_time | |
| # translator.translate_async(text, callback=on_translation_complete) | |
| return on_transcription_complete | |
| def handle_segment(audio_data, end_time): | |
| global transcribe_time | |
| transcribe_time = time.time() | |
| start_time = end_time - CHUNK_DURATION | |
| # Use the transcriber's async API | |
| transcriber.transcribe_async(audio_data, | |
| callback=create_transcription_callback(start_time, end_time)) | |
| # Start audio processing thread | |
| processing_thread = threading.Thread(target=process_audio, daemon=True) | |
| processing_thread.start() | |
| # Start audio input stream | |
| stream = sd.InputStream( | |
| samplerate=SAMPLE_RATE, | |
| channels=1, | |
| callback=audio_callback, | |
| # device=DEVICE | |
| ) | |
| print("🎤 Starting audio capture...") | |
| with stream: | |
| try: | |
| while running: | |
| time.sleep(0.1) | |
| except KeyboardInterrupt: | |
| print("Shutting down...") | |
| running = False | |
| processing_thread.join(timeout=1.0) | |
| transcriber.shutdown() | |
| print("All services stopped.") | |
| ``` | |
| transcriber/transcriber.py: | |
| ``` | |
| import traceback | |
| from typing import Union, Optional, List, Dict, Any | |
| import numpy as np | |
| import mlx.core as mx | |
| from concurrent.futures import ThreadPoolExecutor | |
| from parakeet_mlx import from_pretrained | |
| from parakeet_mlx.audio import get_logmel | |
| DEBUG = False # Set to True for debug output | |
| class ParakeetTranscriber: | |
| def __init__( | |
| self, | |
| model_id: str = "mlx-community/parakeet-tdt-0.6b-v2", | |
| dtype: mx.Dtype = mx.bfloat16, | |
| max_workers: int = 3, | |
| debug=False, | |
| ): | |
| self.model_id = model_id | |
| self.dtype = dtype | |
| self.model = None | |
| self.executor = ThreadPoolExecutor(max_workers=max_workers) | |
| self.sample_rate = 16000 # Parakeet expects 16kHz audio | |
| global DEBUG | |
| DEBUG = debug | |
| def load_model(self) -> None: | |
| if self.model is None: | |
| self.model = from_pretrained(self.model_id, dtype=self.dtype) | |
| def transcribe(self, audio: Union[bytes, np.ndarray, mx.array]): | |
| if len(audio) == 0: | |
| if DEBUG: | |
| print("Received empty audio data, returning empty transcription.") | |
| return "" | |
| self.load_model() | |
| audio_mx = self._prepare_audio(audio) | |
| # Process audio with the model | |
| mel = get_logmel(audio_mx, self.model.preprocessor_config) | |
| mx.eval(mel) | |
| # Generate transcription | |
| results = self.model.generate(mel) | |
| # if DEBUG: | |
| # print(f"Transcription results: {results}") | |
| return results | |
| def transcribe_async( | |
| self, audio: Union[bytes, np.ndarray, mx.array], callback | |
| ): | |
| self.load_model() | |
| def _process_and_callback(): | |
| try: | |
| result = self.transcribe(audio) | |
| callback(result, None) | |
| except Exception as e: | |
| if callback: | |
| callback(None, e) | |
| raise e | |
| return self.executor.submit(_process_and_callback) | |
| def _prepare_audio(self, audio: Union[bytes, np.ndarray, mx.array]) -> mx.array: | |
| if isinstance(audio, bytes): | |
| # Convert from raw bytes (assuming int16 format) | |
| audio_np = np.frombuffer(audio, dtype=np.int16).astype(np.float32) | |
| audio_np /= 32768.0 # Normalize to [-1, 1] | |
| return mx.array(audio_np) | |
| elif isinstance(audio, np.ndarray): | |
| # If already numpy array, just ensure it's float32 normalized | |
| if audio.dtype != np.float32: | |
| audio = audio.astype(np.float32) | |
| if audio.max() > 1.0 or audio.min() < -1.0: | |
| audio /= max(abs(audio.max()), abs(audio.min())) | |
| return mx.array(audio) | |
| elif isinstance(audio, mx.array): | |
| # Already MLX array, ensure it's normalized | |
| return audio | |
| else: | |
| raise ValueError(f"Unsupported audio type: {type(audio)}") | |
| def shutdown(self): | |
| if hasattr(self, "executor"): | |
| self.executor.shutdown(wait=False) | |
| ``` | |
| To give the best performance, we should improve this code. | |
| I suggest we use streaming instead of processing chunks. | |
| Rewrite the code for both main.py and transcriber/transcriber.py to use the streaming interface of the Parakeet model. | |
| """ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment