Skip to content

Instantly share code, notes, and snippets.

@ShayneP
Created April 3, 2025 20:47
Show Gist options
  • Save ShayneP/969d541245ca536a0c8a67c2621c0a47 to your computer and use it in GitHub Desktop.
Save ShayneP/969d541245ca536a0c8a67c2621c0a47 to your computer and use it in GitHub Desktop.
Universal Translator running on Pi Zero 2
# Real-time translator that uses Gladia to transcribe and translate any language to English
# It displays the translated text on a Pirate Audio display on a Raspberry Pi Zero 2 W
from pathlib import Path
from dotenv import load_dotenv
from livekit.agents import JobContext, WorkerOptions, cli
from livekit.agents.voice import Agent, AgentSession
from livekit.plugins import gladia
from PIL import Image
from PIL import ImageDraw
from PIL import ImageFont
import st7789
import textwrap
import os
import logging
import asyncio
import time
# Set up logging
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[logging.FileHandler("translator_debug.log"),
logging.StreamHandler()])
logger = logging.getLogger("gladia-translator")
# Load environment variables
load_dotenv(dotenv_path=Path(__file__).parent.parent / '.env')
# Set up the screen
SPI_SPEED_MHZ = 20
screen = st7789.ST7789(
rotation=90, # Needed to display the right way up on Pirate Audio
port=0, # SPI port
cs=1, # SPI port Chip-select channel
dc=9, # BCM pin used for data/command
backlight=13,
spi_speed_hz=SPI_SPEED_MHZ * 1000 * 1000
)
width = screen.width
height = screen.height
# Create image for display
image = Image.new("RGB", (240, 240), (0, 0, 0))
draw = ImageDraw.Draw(image)
# Set up font
font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", 18)
title_font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", 22)
# Display startup screen
def show_startup_screen():
draw.rectangle((0, 0, width, height), fill=(0, 0, 0))
draw.text((10, 10), "LiveKit", font=title_font, fill=(255, 255, 255))
draw.text((10, 40), "Translator", font=title_font, fill=(255, 255, 255))
draw.text((10, 80), "Starting...", font=font, fill=(200, 200, 200))
screen.display(image)
def show_status_message(message):
"""Display a status message on the screen"""
draw.rectangle((0, 0, width, height), fill=(0, 0, 0))
draw.text((10, 10), "Status", font=title_font, fill=(255, 255, 255))
y_position = 50
wrapped_text = textwrap.wrap(message, width=25) # Adjusted to 25 characters max
for line in wrapped_text:
draw.text((10, y_position), line, font=font, fill=(200, 200, 200))
y_position += 20
screen.display(image)
logger.info(f"Status: {message}")
# Display translation text
def display_translation(text):
try:
# Clear screen
draw.rectangle((0, 0, width, height), fill=(0, 0, 0))
# Add title
draw.text((10, 10), "Gladia Translator", font=title_font, fill=(255, 255, 255))
# Wrap and display the English translation text
y_position = 40
wrapped_text = textwrap.wrap(text, width=25) # Adjusted to 25 characters max
# Display only the most recent lines that fit on screen
max_lines = 10 # Now we can fit more lines without the original text
display_lines = wrapped_text[-max_lines:] if len(wrapped_text) > max_lines else wrapped_text
for line in display_lines:
draw.text((10, y_position), line, font=font, fill=(200, 200, 200))
y_position += 20 # Line spacing
screen.display(image)
except Exception as e:
logger.error(f"Display error: {str(e)}")
show_status_message(f"Display error: {str(e)}")
async def entrypoint(ctx: JobContext):
show_startup_screen()
logger.info("Starting translation agent")
try:
await ctx.connect()
logger.info("Connected to LiveKit")
# Track activity time for idle status
last_activity_time = time.time()
# Store recent translations to display multiple on screen
recent_translations = []
max_saved_translations = 5 # Keep track of last 5 translations
# Status check timer
async def check_activity():
nonlocal last_activity_time
while True:
await asyncio.sleep(30) # Check every 30 seconds
if time.time() - last_activity_time > 60: # If no activity for 1 minute
logger.info("No activity detected for 1 minute, displaying status")
show_status_message("Listening... Speak in any language.")
last_activity_time = time.time()
# Start activity checker
asyncio.create_task(check_activity())
# Create and configure the session
session = AgentSession()
@session.on("user_input_transcribed")
def on_transcript(transcript):
nonlocal last_activity_time, recent_translations
try:
# Log the received transcript
logger.info(f"Received transcript: final={transcript.is_final}, text={transcript.transcript}")
# Update activity time
last_activity_time = time.time()
# For Gladia translation, the translation is in the transcript field
# and the original is in the original_transcript field if available
translation = transcript.transcript
original = getattr(transcript, 'original_transcript', None)
logger.info(f"Translation: {translation}")
if original:
logger.info(f"Original: {original}")
# Only process and display final translations
if transcript.is_final:
# Add valid translation to our list if it's meaningful
if translation.strip() and (original or len(translation) > 3):
recent_translations.append(translation.strip())
# Keep only the most recent translations
if len(recent_translations) > max_saved_translations:
recent_translations = recent_translations[-max_saved_translations:]
# Only update the display when we have a final translation
display_text = "\n".join(recent_translations)
display_translation(display_text)
# Save both translation and original to file
with open("translation_log.txt", "a") as f:
if original:
f.write(f"Original: {original}\nTranslation: {translation}\n\n")
else:
f.write(f"Translation: {translation}\n\n")
# We don't do anything with interim results - just wait for final translations
except Exception as e:
logger.error(f"Error processing transcript: {str(e)}")
show_status_message(f"Error: {str(e)}")
# Display ready message
show_status_message("Ready! Speak in any language.")
logger.info("Starting agent session")
await session.start(
agent=Agent(
instructions="You are a translator that transcribes speech in any language and translates it to English.",
stt=gladia.STT(
languages=["fr", "en", "es", "de", "zh", "ja", "ko", "ru", "pt", "it"],
code_switching=True,
sample_rate=16000,
bit_depth=16,
channels=1,
encoding="wav/pcm",
translation_enabled=True,
translation_target_languages=["en"],
translation_model="base",
translation_match_original_utterances=True
),
allow_interruptions=True # Changed to True to handle continuous speech better
),
room=ctx.room
)
logger.info("Agent session started")
except Exception as e:
logger.error(f"Startup error: {str(e)}")
show_status_message(f"Error: {str(e)}")
if __name__ == "__main__":
try:
logger.info("Starting application")
cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint))
except KeyboardInterrupt:
# Clear screen on exit
draw.rectangle((0, 0, width, height), fill=(0, 0, 0))
screen.display(image)
logger.info("Exiting translator - keyboard interrupt")
print("\nExiting translator")
except Exception as e:
logger.error(f"Fatal error: {str(e)}")
show_status_message(f"Fatal error: {str(e)}")
print(f"\nFatal error: {str(e)}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment