Created
July 31, 2025 02:35
-
-
Save Nash0x7E2/e50043374fbd24365aa25cd4c5e599a1 to your computer and use it in GitHub Desktop.
Example of real-time translation from a live video call running on Getstream.io
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Fal Wizper STT Plugin for Stream | |
Provides real-time audio transcription and translation using fal-ai/wizper (Whisper v3). | |
This plugin integrates with Stream's audio processing pipeline to provide high-quality | |
speech-to-text capabilities. | |
Example usage: | |
from fal_wizper_stt import FalWizperSTT | |
# For transcription | |
stt = FalWizperSTT(task="transcribe") | |
# For translation to Portuguese | |
stt = FalWizperSTT(task="translate", target_language="pt") | |
@stt.on("transcript") | |
async def on_transcript(text: str, user: Any, metadata: dict): | |
print(f"Transcript: {text}") | |
@stt.on("error") | |
async def on_error(error: str): | |
print(f"Error: {error}") | |
""" | |
import asyncio | |
import io | |
import os | |
import tempfile | |
import wave | |
import time | |
import logging | |
import numpy as np | |
from typing import Any, Callable, Dict, Optional, List, Tuple | |
import fal_client | |
from getstream.video.rtc.track_util import PcmData | |
from getstream.plugins.common import STT | |
logger = logging.getLogger(__name__) | |
class FalWizperSTT(STT): | |
""" | |
Audio transcription and translation using fal-ai/wizper (Whisper v3). | |
This plugin provides real-time speech-to-text capabilities using the fal-ai/wizper | |
service, which is based on OpenAI's Whisper v3 model. It supports both transcription | |
and translation tasks. | |
Attributes: | |
task: The task type - either "transcribe" or "translate" | |
target_language: Target language code for translation (e.g., "pt" for Portuguese) | |
""" | |
def __init__(self, task: str = "translate", target_language: str = "pt"): | |
""" | |
Initialize FalWizperSTT | |
Args: | |
task: "transcribe" or "translate" | |
target_language: Target language code (e.g., "pt" for Portuguese) | |
""" | |
self.task = task | |
self.target_language = target_language | |
self._callbacks: Dict[str, list] = {"transcript": [], "error": []} | |
self._sample_rate = 48000 | |
self._is_closed = False | |
self.last_activity_time = time.time() | |
def on(self, event: str) -> Callable: | |
""" | |
Register event callback decorator. | |
Supported events: | |
- "transcript": Emitted when a transcript is received | |
- "error": Emitted when an error occurs | |
Args: | |
event: The event name to listen for | |
Returns: | |
Decorator function for registering callbacks | |
Example: | |
@stt.on("transcript") | |
async def on_transcript(text: str, user: Any, metadata: dict): | |
print(f"Received transcript: {text}") | |
""" | |
def decorator(callback: Callable) -> Callable: | |
if event not in self._callbacks: | |
self._callbacks[event] = [] | |
self._callbacks[event].append(callback) | |
return callback | |
return decorator | |
def _emit(self, event: str, *args, **kwargs) -> None: | |
"""Emit event to registered callbacks""" | |
callbacks = self._callbacks.get(event, []) | |
if callbacks: | |
for callback in callbacks: | |
try: | |
asyncio.create_task(callback(*args, **kwargs)) | |
except Exception as e: | |
logger.error(f"Error in callback for event '{event}': {e}") | |
def _pcm_to_wav_bytes(self, pcm_data: PcmData) -> bytes: | |
""" | |
Convert PCM data to WAV format bytes. | |
Args: | |
pcm_data: PCM audio data from Stream's audio pipeline | |
Returns: | |
WAV format audio data as bytes | |
Raises: | |
AttributeError: If PCM data format is not recognized | |
""" | |
# Try common attribute names | |
audio_data = None | |
for attr in ['data', 'samples', 'bytes', 'raw_data', 'audio_data', 'pcm_data']: | |
if hasattr(pcm_data, attr): | |
audio_data = getattr(pcm_data, attr) | |
break | |
if audio_data is None: | |
raise AttributeError("Could not find audio data attribute in PcmData object") | |
wav_buffer = io.BytesIO() | |
with wave.open(wav_buffer, 'wb') as wav_file: | |
wav_file.setnchannels(1) # Mono | |
wav_file.setsampwidth(2) # 16-bit | |
wav_file.setframerate(self._sample_rate) | |
wav_file.writeframes(audio_data) | |
wav_buffer.seek(0) | |
return wav_buffer.read() | |
def _numpy_to_wav_bytes(self, audio_array: np.ndarray) -> bytes: | |
""" | |
Convert numpy array to WAV format bytes. | |
Args: | |
audio_array: Audio data as numpy array | |
Returns: | |
WAV format audio data as bytes | |
""" | |
# Ensure audio is int16 | |
if audio_array.dtype != np.int16: | |
if audio_array.dtype == np.float32 or audio_array.dtype == np.float64: | |
# Convert from float [-1, 1] to int16 | |
audio_array = (audio_array * 32767).astype(np.int16) | |
else: | |
audio_array = audio_array.astype(np.int16) | |
wav_buffer = io.BytesIO() | |
with wave.open(wav_buffer, 'wb') as wav_file: | |
wav_file.setnchannels(1) # Mono | |
wav_file.setsampwidth(2) # 16-bit | |
wav_file.setframerate(self._sample_rate) | |
wav_file.writeframes(audio_array.tobytes()) | |
wav_buffer.seek(0) | |
return wav_buffer.read() | |
async def _process_audio_impl( | |
self, pcm_data: PcmData, user_metadata: Optional[Dict[str, Any]] = None | |
) -> Optional[List[Tuple[bool, str, Dict[str, Any]]]]: | |
""" | |
Process audio data through fal-ai/wizper for transcription. | |
Args: | |
pcm_data: The PCM audio data to process. | |
user_metadata: Additional metadata about the user or session. | |
Returns: | |
None - fal-ai/wizper operates in asynchronous mode and emits events directly | |
when transcripts arrive from the streaming service. | |
""" | |
if self._is_closed: | |
logger.warning("fal-ai/wizper connection is closed, ignoring audio") | |
return None | |
# Check if the input sample rate matches the expected sample rate | |
if hasattr(pcm_data, 'sample_rate') and pcm_data.sample_rate != self._sample_rate: | |
logger.warning( | |
"Input audio sample rate (%s Hz) does not match the expected sample rate (%s Hz). " | |
"This may result in incorrect transcriptions. Consider resampling the audio.", | |
pcm_data.sample_rate, | |
self._sample_rate, | |
) | |
# Update the last activity time | |
self.last_activity_time = time.time() | |
try: | |
# Convert PCM to WAV format | |
wav_data = self._pcm_to_wav_bytes(pcm_data) | |
# Create temporary file for upload | |
with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as temp_file: | |
temp_file.write(wav_data) | |
temp_file.flush() | |
temp_file_path = temp_file.name | |
try: | |
# Upload file and get URL | |
audio_url = fal_client.upload_file(temp_file_path) | |
# Prepare request parameters | |
input_params = { | |
"audio_url": audio_url, | |
"task": self.task, | |
"chunk_level": "segment", | |
"version": "3", | |
"language": self.target_language | |
} | |
logger.debug( | |
"Sending audio data to fal-ai/wizper", | |
extra={"audio_bytes": len(wav_data)}, | |
) | |
# Submit to fal-ai/wizper | |
result = await asyncio.get_event_loop().run_in_executor( | |
None, | |
lambda: fal_client.subscribe("fal-ai/wizper", input_params) | |
) | |
# Extract text from result | |
if "text" in result: | |
text = result["text"].strip() | |
if text: | |
self._emit("transcript", text, user_metadata, {"chunks": result.get("chunks", [])}) | |
finally: | |
# Clean up temporary file | |
try: | |
os.unlink(temp_file_path) | |
except OSError: | |
pass | |
except Exception as e: | |
logger.error(f"fal-ai/wizper audio transmission error: {e}") | |
self._emit("error", f"fal-ai/wizper audio transmission error: {e}") | |
raise | |
# Return None for asynchronous mode - events are emitted when they arrive | |
return None | |
async def process_speech_audio(self, speech_audio: np.ndarray, user: Any) -> None: | |
""" | |
Process accumulated speech audio through fal-ai/wizper. | |
This method is typically called by VAD (Voice Activity Detection) systems | |
when speech segments are detected. | |
Args: | |
speech_audio: Accumulated speech audio as numpy array | |
user: User metadata from the Stream call | |
""" | |
if self._is_closed: | |
return | |
if len(speech_audio) == 0: | |
return | |
try: | |
# Convert numpy array to WAV format | |
wav_data = self._numpy_to_wav_bytes(speech_audio) | |
# Create temporary file for upload | |
with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as temp_file: | |
temp_file.write(wav_data) | |
temp_file.flush() | |
temp_file_path = temp_file.name | |
try: | |
# Upload file and get URL | |
audio_url = fal_client.upload_file(temp_file_path) | |
# Prepare request parameters | |
input_params = { | |
"audio_url": audio_url, | |
"task": self.task, | |
"chunk_level": "segment", | |
"version": "3" | |
} | |
# Add language for translation | |
if self.task == "translate": | |
input_params["language"] = self.target_language | |
logger.debug( | |
"Sending speech audio to fal-ai/wizper", | |
extra={"audio_bytes": len(wav_data)}, | |
) | |
# Submit to fal-ai/wizper | |
result = await asyncio.get_event_loop().run_in_executor( | |
None, | |
lambda: fal_client.subscribe("fal-ai/wizper", input_params) | |
) | |
# Extract text from result | |
if "text" in result: | |
text = result["text"].strip() | |
if text: | |
self._emit("transcript", text, user, {"chunks": result.get("chunks", [])}) | |
finally: | |
# Clean up temporary file | |
try: | |
os.unlink(temp_file_path) | |
except OSError: | |
pass | |
except Exception as e: | |
logger.error(f"FalWizper processing error: {str(e)}") | |
self._emit("error", f"FalWizper processing error: {str(e)}") | |
async def process_audio(self, pcm_data: PcmData, user: Any) -> None: | |
""" | |
Process audio data through fal-ai/wizper. | |
This is the main entry point for audio processing. It handles individual | |
audio frames from Stream's real-time audio pipeline. | |
Args: | |
pcm_data: PCM audio data from Stream | |
user: User metadata from the Stream call | |
""" | |
await self._process_audio_impl(pcm_data, user) | |
async def close(self) -> None: | |
""" | |
Cleanup resources and close the plugin. | |
This method should be called when the plugin is no longer needed | |
to ensure proper cleanup of resources. | |
""" | |
self._is_closed = True | |
self._callbacks.clear() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
Example: Real-time Call Translation with fal-ai/wizper | |
Uses the fal-ai/wizper (Whisper v3) model for speech-to-text Translation. | |
Steps: | |
1. Create two temporary Stream users (human + bot) | |
2. Spin up a new call and open it in your browser | |
3. The bot joins and pipes every audio frame to fal-ai/wizper | |
4. Final transcripts are printed with timestamps | |
Run: | |
python main.py | |
""" | |
from __future__ import annotations | |
import asyncio | |
import logging | |
import os | |
import time | |
import uuid | |
import webbrowser | |
from urllib.parse import urlencode | |
from dotenv import load_dotenv | |
from getstream.models import UserRequest | |
from getstream.stream import Stream | |
from getstream.video import rtc | |
from getstream.video.rtc.track_util import PcmData | |
from getstream.plugins.silero.vad import SileroVAD | |
from fal_wizper_stt import FalWizperSTT | |
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") | |
def create_user(client: Stream, id: str, name: str) -> None: | |
""" | |
Create a user with a unique Stream ID. | |
Args: | |
client: Stream client instance | |
id: Unique user ID | |
name: Display name for the user | |
""" | |
user_request = UserRequest(id=id, name=name) | |
client.upsert_users(user_request) | |
def open_browser(api_key: str, token: str, call_id: str) -> str: | |
""" | |
Helper function to open browser with Stream call link. | |
Args: | |
api_key: Stream API key | |
token: JWT token for the user | |
call_id: ID of the call | |
Returns: | |
The URL that was opened | |
""" | |
base_url = f"{os.getenv('EXAMPLE_BASE_URL')}/join/" | |
params = {"api_key": api_key, "token": token, "skip_lobby": "true"} | |
url = f"{base_url}{call_id}?{urlencode(params)}" | |
print(f"Opening browser to: {url}") | |
try: | |
webbrowser.open(url) | |
print("Browser opened successfully!") | |
except Exception as e: | |
print(f"Failed to open browser: {e}") | |
print(f"Please manually open this URL: {url}") | |
return url | |
async def main() -> None: # noqa: D401 | |
print("🎙️ Stream + fal-ai/wizper Real-time Transcription Example") | |
print("=" * 60) | |
load_dotenv() | |
client = Stream.from_env() | |
call_id = str(uuid.uuid4()) | |
print(f"📞 Call ID: {call_id}") | |
user_id = f"user-{uuid.uuid4()}" | |
create_user(client, user_id, "My User") | |
logging.info("👤 Created user: %s", user_id) | |
user_token = client.create_token(user_id, expiration=3600) | |
bot_user_id = f"fal-wizper-bot-{uuid.uuid4()}" | |
create_user(client, bot_user_id, "Fal Wizper Bot") | |
logging.info("🤖 Created bot user: %s", bot_user_id) | |
call = client.video.call("default", call_id) | |
call.get_or_create(data={"created_by_id": bot_user_id}) | |
print(f"📞 Call created: {call_id}") | |
open_browser(client.api_key, user_token, call_id) | |
print("\n🤖 Starting transcription bot…") | |
print("Speak in the browser and see transcripts below. Press Ctrl+C to stop.\n") | |
# Initialize fal-ai/wizper for transcription | |
stt = FalWizperSTT(task="transcribe", target_language="pt") | |
# Initialize VAD processor | |
vad = SileroVAD() | |
print("✅ Audio processing pipeline ready: VAD → fal-ai/wizper STT") | |
try: | |
async with await rtc.join(call, bot_user_id) as connection: | |
print(f"✅ Bot joined call: {call_id}") | |
@connection.on("audio") | |
async def _on_audio(pcm: PcmData, user): | |
await vad.process_audio(pcm, user) | |
@vad.on("audio") | |
async def _on_speech_detected(pcm: PcmData, user): | |
print( | |
f"🎤 Speech detected from user: {user.name if hasattr(user, 'name') else user}, duration: {pcm.duration:.2f}s" | |
) | |
await stt.process_audio(pcm, user) | |
@stt.on("transcript") | |
async def _on_transcript(text: str, user: any, metadata: dict): | |
ts = time.strftime("%H:%M:%S") | |
who = user.name if hasattr(user, 'name') else str(user) if user else "unknown" | |
print(f"[{ts}] {who}: {text}") | |
@stt.on("error") | |
async def _on_stt_error(err): | |
print(f"\n❌ STT Error: {err}") | |
print("🎧 Listening for audio… (Press Ctrl+C to stop)") | |
await connection.wait() | |
except asyncio.CancelledError: | |
print("\n⏹️ Stopping transcription bot…") | |
except Exception as e: # noqa: BLE001 | |
print(f"❌ Error: {e}") | |
import traceback | |
traceback.print_exc() | |
finally: | |
await stt.close() | |
client.delete_users([user_id, bot_user_id]) | |
print("🧹 Cleanup completed") | |
if __name__ == "__main__": | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment