Skip to content

Instantly share code, notes, and snippets.

@Nash0x7E2
Created July 31, 2025 02:35
Show Gist options
  • Save Nash0x7E2/e50043374fbd24365aa25cd4c5e599a1 to your computer and use it in GitHub Desktop.
Save Nash0x7E2/e50043374fbd24365aa25cd4c5e599a1 to your computer and use it in GitHub Desktop.
Example of real-time translation from a live video call running on Getstream.io
"""
Fal Wizper STT Plugin for Stream
Provides real-time audio transcription and translation using fal-ai/wizper (Whisper v3).
This plugin integrates with Stream's audio processing pipeline to provide high-quality
speech-to-text capabilities.
Example usage:
from fal_wizper_stt import FalWizperSTT
# For transcription
stt = FalWizperSTT(task="transcribe")
# For translation to Portuguese
stt = FalWizperSTT(task="translate", target_language="pt")
@stt.on("transcript")
async def on_transcript(text: str, user: Any, metadata: dict):
print(f"Transcript: {text}")
@stt.on("error")
async def on_error(error: str):
print(f"Error: {error}")
"""
import asyncio
import io
import os
import tempfile
import wave
import time
import logging
import numpy as np
from typing import Any, Callable, Dict, Optional, List, Tuple
import fal_client
from getstream.video.rtc.track_util import PcmData
from getstream.plugins.common import STT
logger = logging.getLogger(__name__)
class FalWizperSTT(STT):
"""
Audio transcription and translation using fal-ai/wizper (Whisper v3).
This plugin provides real-time speech-to-text capabilities using the fal-ai/wizper
service, which is based on OpenAI's Whisper v3 model. It supports both transcription
and translation tasks.
Attributes:
task: The task type - either "transcribe" or "translate"
target_language: Target language code for translation (e.g., "pt" for Portuguese)
"""
def __init__(self, task: str = "translate", target_language: str = "pt"):
"""
Initialize FalWizperSTT
Args:
task: "transcribe" or "translate"
target_language: Target language code (e.g., "pt" for Portuguese)
"""
self.task = task
self.target_language = target_language
self._callbacks: Dict[str, list] = {"transcript": [], "error": []}
self._sample_rate = 48000
self._is_closed = False
self.last_activity_time = time.time()
def on(self, event: str) -> Callable:
"""
Register event callback decorator.
Supported events:
- "transcript": Emitted when a transcript is received
- "error": Emitted when an error occurs
Args:
event: The event name to listen for
Returns:
Decorator function for registering callbacks
Example:
@stt.on("transcript")
async def on_transcript(text: str, user: Any, metadata: dict):
print(f"Received transcript: {text}")
"""
def decorator(callback: Callable) -> Callable:
if event not in self._callbacks:
self._callbacks[event] = []
self._callbacks[event].append(callback)
return callback
return decorator
def _emit(self, event: str, *args, **kwargs) -> None:
"""Emit event to registered callbacks"""
callbacks = self._callbacks.get(event, [])
if callbacks:
for callback in callbacks:
try:
asyncio.create_task(callback(*args, **kwargs))
except Exception as e:
logger.error(f"Error in callback for event '{event}': {e}")
def _pcm_to_wav_bytes(self, pcm_data: PcmData) -> bytes:
"""
Convert PCM data to WAV format bytes.
Args:
pcm_data: PCM audio data from Stream's audio pipeline
Returns:
WAV format audio data as bytes
Raises:
AttributeError: If PCM data format is not recognized
"""
# Try common attribute names
audio_data = None
for attr in ['data', 'samples', 'bytes', 'raw_data', 'audio_data', 'pcm_data']:
if hasattr(pcm_data, attr):
audio_data = getattr(pcm_data, attr)
break
if audio_data is None:
raise AttributeError("Could not find audio data attribute in PcmData object")
wav_buffer = io.BytesIO()
with wave.open(wav_buffer, 'wb') as wav_file:
wav_file.setnchannels(1) # Mono
wav_file.setsampwidth(2) # 16-bit
wav_file.setframerate(self._sample_rate)
wav_file.writeframes(audio_data)
wav_buffer.seek(0)
return wav_buffer.read()
def _numpy_to_wav_bytes(self, audio_array: np.ndarray) -> bytes:
"""
Convert numpy array to WAV format bytes.
Args:
audio_array: Audio data as numpy array
Returns:
WAV format audio data as bytes
"""
# Ensure audio is int16
if audio_array.dtype != np.int16:
if audio_array.dtype == np.float32 or audio_array.dtype == np.float64:
# Convert from float [-1, 1] to int16
audio_array = (audio_array * 32767).astype(np.int16)
else:
audio_array = audio_array.astype(np.int16)
wav_buffer = io.BytesIO()
with wave.open(wav_buffer, 'wb') as wav_file:
wav_file.setnchannels(1) # Mono
wav_file.setsampwidth(2) # 16-bit
wav_file.setframerate(self._sample_rate)
wav_file.writeframes(audio_array.tobytes())
wav_buffer.seek(0)
return wav_buffer.read()
async def _process_audio_impl(
self, pcm_data: PcmData, user_metadata: Optional[Dict[str, Any]] = None
) -> Optional[List[Tuple[bool, str, Dict[str, Any]]]]:
"""
Process audio data through fal-ai/wizper for transcription.
Args:
pcm_data: The PCM audio data to process.
user_metadata: Additional metadata about the user or session.
Returns:
None - fal-ai/wizper operates in asynchronous mode and emits events directly
when transcripts arrive from the streaming service.
"""
if self._is_closed:
logger.warning("fal-ai/wizper connection is closed, ignoring audio")
return None
# Check if the input sample rate matches the expected sample rate
if hasattr(pcm_data, 'sample_rate') and pcm_data.sample_rate != self._sample_rate:
logger.warning(
"Input audio sample rate (%s Hz) does not match the expected sample rate (%s Hz). "
"This may result in incorrect transcriptions. Consider resampling the audio.",
pcm_data.sample_rate,
self._sample_rate,
)
# Update the last activity time
self.last_activity_time = time.time()
try:
# Convert PCM to WAV format
wav_data = self._pcm_to_wav_bytes(pcm_data)
# Create temporary file for upload
with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as temp_file:
temp_file.write(wav_data)
temp_file.flush()
temp_file_path = temp_file.name
try:
# Upload file and get URL
audio_url = fal_client.upload_file(temp_file_path)
# Prepare request parameters
input_params = {
"audio_url": audio_url,
"task": self.task,
"chunk_level": "segment",
"version": "3",
"language": self.target_language
}
logger.debug(
"Sending audio data to fal-ai/wizper",
extra={"audio_bytes": len(wav_data)},
)
# Submit to fal-ai/wizper
result = await asyncio.get_event_loop().run_in_executor(
None,
lambda: fal_client.subscribe("fal-ai/wizper", input_params)
)
# Extract text from result
if "text" in result:
text = result["text"].strip()
if text:
self._emit("transcript", text, user_metadata, {"chunks": result.get("chunks", [])})
finally:
# Clean up temporary file
try:
os.unlink(temp_file_path)
except OSError:
pass
except Exception as e:
logger.error(f"fal-ai/wizper audio transmission error: {e}")
self._emit("error", f"fal-ai/wizper audio transmission error: {e}")
raise
# Return None for asynchronous mode - events are emitted when they arrive
return None
async def process_speech_audio(self, speech_audio: np.ndarray, user: Any) -> None:
"""
Process accumulated speech audio through fal-ai/wizper.
This method is typically called by VAD (Voice Activity Detection) systems
when speech segments are detected.
Args:
speech_audio: Accumulated speech audio as numpy array
user: User metadata from the Stream call
"""
if self._is_closed:
return
if len(speech_audio) == 0:
return
try:
# Convert numpy array to WAV format
wav_data = self._numpy_to_wav_bytes(speech_audio)
# Create temporary file for upload
with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as temp_file:
temp_file.write(wav_data)
temp_file.flush()
temp_file_path = temp_file.name
try:
# Upload file and get URL
audio_url = fal_client.upload_file(temp_file_path)
# Prepare request parameters
input_params = {
"audio_url": audio_url,
"task": self.task,
"chunk_level": "segment",
"version": "3"
}
# Add language for translation
if self.task == "translate":
input_params["language"] = self.target_language
logger.debug(
"Sending speech audio to fal-ai/wizper",
extra={"audio_bytes": len(wav_data)},
)
# Submit to fal-ai/wizper
result = await asyncio.get_event_loop().run_in_executor(
None,
lambda: fal_client.subscribe("fal-ai/wizper", input_params)
)
# Extract text from result
if "text" in result:
text = result["text"].strip()
if text:
self._emit("transcript", text, user, {"chunks": result.get("chunks", [])})
finally:
# Clean up temporary file
try:
os.unlink(temp_file_path)
except OSError:
pass
except Exception as e:
logger.error(f"FalWizper processing error: {str(e)}")
self._emit("error", f"FalWizper processing error: {str(e)}")
async def process_audio(self, pcm_data: PcmData, user: Any) -> None:
"""
Process audio data through fal-ai/wizper.
This is the main entry point for audio processing. It handles individual
audio frames from Stream's real-time audio pipeline.
Args:
pcm_data: PCM audio data from Stream
user: User metadata from the Stream call
"""
await self._process_audio_impl(pcm_data, user)
async def close(self) -> None:
"""
Cleanup resources and close the plugin.
This method should be called when the plugin is no longer needed
to ensure proper cleanup of resources.
"""
self._is_closed = True
self._callbacks.clear()
#!/usr/bin/env python3
"""
Example: Real-time Call Translation with fal-ai/wizper
Uses the fal-ai/wizper (Whisper v3) model for speech-to-text Translation.
Steps:
1. Create two temporary Stream users (human + bot)
2. Spin up a new call and open it in your browser
3. The bot joins and pipes every audio frame to fal-ai/wizper
4. Final transcripts are printed with timestamps
Run:
python main.py
"""
from __future__ import annotations
import asyncio
import logging
import os
import time
import uuid
import webbrowser
from urllib.parse import urlencode
from dotenv import load_dotenv
from getstream.models import UserRequest
from getstream.stream import Stream
from getstream.video import rtc
from getstream.video.rtc.track_util import PcmData
from getstream.plugins.silero.vad import SileroVAD
from fal_wizper_stt import FalWizperSTT
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
def create_user(client: Stream, id: str, name: str) -> None:
"""
Create a user with a unique Stream ID.
Args:
client: Stream client instance
id: Unique user ID
name: Display name for the user
"""
user_request = UserRequest(id=id, name=name)
client.upsert_users(user_request)
def open_browser(api_key: str, token: str, call_id: str) -> str:
"""
Helper function to open browser with Stream call link.
Args:
api_key: Stream API key
token: JWT token for the user
call_id: ID of the call
Returns:
The URL that was opened
"""
base_url = f"{os.getenv('EXAMPLE_BASE_URL')}/join/"
params = {"api_key": api_key, "token": token, "skip_lobby": "true"}
url = f"{base_url}{call_id}?{urlencode(params)}"
print(f"Opening browser to: {url}")
try:
webbrowser.open(url)
print("Browser opened successfully!")
except Exception as e:
print(f"Failed to open browser: {e}")
print(f"Please manually open this URL: {url}")
return url
async def main() -> None: # noqa: D401
print("🎙️ Stream + fal-ai/wizper Real-time Transcription Example")
print("=" * 60)
load_dotenv()
client = Stream.from_env()
call_id = str(uuid.uuid4())
print(f"📞 Call ID: {call_id}")
user_id = f"user-{uuid.uuid4()}"
create_user(client, user_id, "My User")
logging.info("👤 Created user: %s", user_id)
user_token = client.create_token(user_id, expiration=3600)
bot_user_id = f"fal-wizper-bot-{uuid.uuid4()}"
create_user(client, bot_user_id, "Fal Wizper Bot")
logging.info("🤖 Created bot user: %s", bot_user_id)
call = client.video.call("default", call_id)
call.get_or_create(data={"created_by_id": bot_user_id})
print(f"📞 Call created: {call_id}")
open_browser(client.api_key, user_token, call_id)
print("\n🤖 Starting transcription bot…")
print("Speak in the browser and see transcripts below. Press Ctrl+C to stop.\n")
# Initialize fal-ai/wizper for transcription
stt = FalWizperSTT(task="transcribe", target_language="pt")
# Initialize VAD processor
vad = SileroVAD()
print("✅ Audio processing pipeline ready: VAD → fal-ai/wizper STT")
try:
async with await rtc.join(call, bot_user_id) as connection:
print(f"✅ Bot joined call: {call_id}")
@connection.on("audio")
async def _on_audio(pcm: PcmData, user):
await vad.process_audio(pcm, user)
@vad.on("audio")
async def _on_speech_detected(pcm: PcmData, user):
print(
f"🎤 Speech detected from user: {user.name if hasattr(user, 'name') else user}, duration: {pcm.duration:.2f}s"
)
await stt.process_audio(pcm, user)
@stt.on("transcript")
async def _on_transcript(text: str, user: any, metadata: dict):
ts = time.strftime("%H:%M:%S")
who = user.name if hasattr(user, 'name') else str(user) if user else "unknown"
print(f"[{ts}] {who}: {text}")
@stt.on("error")
async def _on_stt_error(err):
print(f"\n❌ STT Error: {err}")
print("🎧 Listening for audio… (Press Ctrl+C to stop)")
await connection.wait()
except asyncio.CancelledError:
print("\n⏹️ Stopping transcription bot…")
except Exception as e: # noqa: BLE001
print(f"❌ Error: {e}")
import traceback
traceback.print_exc()
finally:
await stt.close()
client.delete_users([user_id, bot_user_id])
print("🧹 Cleanup completed")
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment