Created
April 3, 2025 20:47
-
-
Save ShayneP/969d541245ca536a0c8a67c2621c0a47 to your computer and use it in GitHub Desktop.
Universal Translator running on Pi Zero 2
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Real-time translator that uses Gladia to transcribe and translate any language to English | |
# It displays the translated text on a Pirate Audio display on a Raspberry Pi Zero 2 W | |
from pathlib import Path | |
from dotenv import load_dotenv | |
from livekit.agents import JobContext, WorkerOptions, cli | |
from livekit.agents.voice import Agent, AgentSession | |
from livekit.plugins import gladia | |
from PIL import Image | |
from PIL import ImageDraw | |
from PIL import ImageFont | |
import st7789 | |
import textwrap | |
import os | |
import logging | |
import asyncio | |
import time | |
# Set up logging | |
logging.basicConfig(level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
handlers=[logging.FileHandler("translator_debug.log"), | |
logging.StreamHandler()]) | |
logger = logging.getLogger("gladia-translator") | |
# Load environment variables | |
load_dotenv(dotenv_path=Path(__file__).parent.parent / '.env') | |
# Set up the screen | |
SPI_SPEED_MHZ = 20 | |
screen = st7789.ST7789( | |
rotation=90, # Needed to display the right way up on Pirate Audio | |
port=0, # SPI port | |
cs=1, # SPI port Chip-select channel | |
dc=9, # BCM pin used for data/command | |
backlight=13, | |
spi_speed_hz=SPI_SPEED_MHZ * 1000 * 1000 | |
) | |
width = screen.width | |
height = screen.height | |
# Create image for display | |
image = Image.new("RGB", (240, 240), (0, 0, 0)) | |
draw = ImageDraw.Draw(image) | |
# Set up font | |
font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", 18) | |
title_font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", 22) | |
# Display startup screen | |
def show_startup_screen(): | |
draw.rectangle((0, 0, width, height), fill=(0, 0, 0)) | |
draw.text((10, 10), "LiveKit", font=title_font, fill=(255, 255, 255)) | |
draw.text((10, 40), "Translator", font=title_font, fill=(255, 255, 255)) | |
draw.text((10, 80), "Starting...", font=font, fill=(200, 200, 200)) | |
screen.display(image) | |
def show_status_message(message): | |
"""Display a status message on the screen""" | |
draw.rectangle((0, 0, width, height), fill=(0, 0, 0)) | |
draw.text((10, 10), "Status", font=title_font, fill=(255, 255, 255)) | |
y_position = 50 | |
wrapped_text = textwrap.wrap(message, width=25) # Adjusted to 25 characters max | |
for line in wrapped_text: | |
draw.text((10, y_position), line, font=font, fill=(200, 200, 200)) | |
y_position += 20 | |
screen.display(image) | |
logger.info(f"Status: {message}") | |
# Display translation text | |
def display_translation(text): | |
try: | |
# Clear screen | |
draw.rectangle((0, 0, width, height), fill=(0, 0, 0)) | |
# Add title | |
draw.text((10, 10), "Gladia Translator", font=title_font, fill=(255, 255, 255)) | |
# Wrap and display the English translation text | |
y_position = 40 | |
wrapped_text = textwrap.wrap(text, width=25) # Adjusted to 25 characters max | |
# Display only the most recent lines that fit on screen | |
max_lines = 10 # Now we can fit more lines without the original text | |
display_lines = wrapped_text[-max_lines:] if len(wrapped_text) > max_lines else wrapped_text | |
for line in display_lines: | |
draw.text((10, y_position), line, font=font, fill=(200, 200, 200)) | |
y_position += 20 # Line spacing | |
screen.display(image) | |
except Exception as e: | |
logger.error(f"Display error: {str(e)}") | |
show_status_message(f"Display error: {str(e)}") | |
async def entrypoint(ctx: JobContext): | |
show_startup_screen() | |
logger.info("Starting translation agent") | |
try: | |
await ctx.connect() | |
logger.info("Connected to LiveKit") | |
# Track activity time for idle status | |
last_activity_time = time.time() | |
# Store recent translations to display multiple on screen | |
recent_translations = [] | |
max_saved_translations = 5 # Keep track of last 5 translations | |
# Status check timer | |
async def check_activity(): | |
nonlocal last_activity_time | |
while True: | |
await asyncio.sleep(30) # Check every 30 seconds | |
if time.time() - last_activity_time > 60: # If no activity for 1 minute | |
logger.info("No activity detected for 1 minute, displaying status") | |
show_status_message("Listening... Speak in any language.") | |
last_activity_time = time.time() | |
# Start activity checker | |
asyncio.create_task(check_activity()) | |
# Create and configure the session | |
session = AgentSession() | |
@session.on("user_input_transcribed") | |
def on_transcript(transcript): | |
nonlocal last_activity_time, recent_translations | |
try: | |
# Log the received transcript | |
logger.info(f"Received transcript: final={transcript.is_final}, text={transcript.transcript}") | |
# Update activity time | |
last_activity_time = time.time() | |
# For Gladia translation, the translation is in the transcript field | |
# and the original is in the original_transcript field if available | |
translation = transcript.transcript | |
original = getattr(transcript, 'original_transcript', None) | |
logger.info(f"Translation: {translation}") | |
if original: | |
logger.info(f"Original: {original}") | |
# Only process and display final translations | |
if transcript.is_final: | |
# Add valid translation to our list if it's meaningful | |
if translation.strip() and (original or len(translation) > 3): | |
recent_translations.append(translation.strip()) | |
# Keep only the most recent translations | |
if len(recent_translations) > max_saved_translations: | |
recent_translations = recent_translations[-max_saved_translations:] | |
# Only update the display when we have a final translation | |
display_text = "\n".join(recent_translations) | |
display_translation(display_text) | |
# Save both translation and original to file | |
with open("translation_log.txt", "a") as f: | |
if original: | |
f.write(f"Original: {original}\nTranslation: {translation}\n\n") | |
else: | |
f.write(f"Translation: {translation}\n\n") | |
# We don't do anything with interim results - just wait for final translations | |
except Exception as e: | |
logger.error(f"Error processing transcript: {str(e)}") | |
show_status_message(f"Error: {str(e)}") | |
# Display ready message | |
show_status_message("Ready! Speak in any language.") | |
logger.info("Starting agent session") | |
await session.start( | |
agent=Agent( | |
instructions="You are a translator that transcribes speech in any language and translates it to English.", | |
stt=gladia.STT( | |
languages=["fr", "en", "es", "de", "zh", "ja", "ko", "ru", "pt", "it"], | |
code_switching=True, | |
sample_rate=16000, | |
bit_depth=16, | |
channels=1, | |
encoding="wav/pcm", | |
translation_enabled=True, | |
translation_target_languages=["en"], | |
translation_model="base", | |
translation_match_original_utterances=True | |
), | |
allow_interruptions=True # Changed to True to handle continuous speech better | |
), | |
room=ctx.room | |
) | |
logger.info("Agent session started") | |
except Exception as e: | |
logger.error(f"Startup error: {str(e)}") | |
show_status_message(f"Error: {str(e)}") | |
if __name__ == "__main__": | |
try: | |
logger.info("Starting application") | |
cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint)) | |
except KeyboardInterrupt: | |
# Clear screen on exit | |
draw.rectangle((0, 0, width, height), fill=(0, 0, 0)) | |
screen.display(image) | |
logger.info("Exiting translator - keyboard interrupt") | |
print("\nExiting translator") | |
except Exception as e: | |
logger.error(f"Fatal error: {str(e)}") | |
show_status_message(f"Fatal error: {str(e)}") | |
print(f"\nFatal error: {str(e)}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment