|
#!/usr/bin/env uv run |
|
# /// script |
|
# requires-python = ">=3.11" |
|
# dependencies = [ |
|
# "keyring", |
|
# "google-genai", |
|
# "rich", |
|
# "pyperclip", |
|
# "moviepy", |
|
# ] |
|
# /// |
|
import os |
|
import time |
|
# import google.generativeai as genai |
|
from google import genai |
|
from google.genai import types |
|
|
|
import sys # Import the sys module to access command-line arguments |
|
import mimetypes # Import the mimetypes module to guess mime type |
|
import keyring |
|
from rich.markdown import Markdown |
|
from rich.console import Console |
|
import pyperclip |
|
import argparse # Add this import at the top with other imports |
|
from moviepy.video.io.VideoFileClip import VideoFileClip |
|
import tempfile |
|
from PIL import Image |
|
import textwrap |
|
|
|
# Try to get the API key from the environment |
|
api_key = os.environ.get("GEMINI_API_KEY") |
|
|
|
if api_key: |
|
print("Salvando chave no keychain") |
|
keyring.set_password("google-gemini", "default", api_key) |
|
|
|
# If not in the environment, try the macOS Keychain |
|
if not api_key and sys.platform == "darwin": |
|
try: |
|
api_key = keyring.get_password("google-gemini", "default") |
|
if api_key: |
|
print("API key found in macOS Keychain.") |
|
else: |
|
print("Gemini API key not found in environment or macOS Keychain.") |
|
print("Please set the GEMINI_API_KEY environment variable or store it in the keychain.") |
|
sys.exit(1) |
|
except ImportError: |
|
print("Keychain access requires the 'keyring' library. Please install it using 'pip install keyring'.") |
|
print("Alternatively, set the GEMINI_API_KEY environment variable.") |
|
sys.exit(1) |
|
except Exception as e: |
|
print(f"Error accessing macOS Keychain: {e}") |
|
print("Please ensure keyring is configured correctly or set the GEMINI_API_KEY environment variable.") |
|
sys.exit(1) |
|
elif not api_key: |
|
print("Gemini API key not found in environment.") |
|
if sys.platform == "darwin": |
|
print("You can store it in the macOS Keychain using 'python -m keyring set google-gemini default'.") |
|
print("Please set the GEMINI_API_KEY environment variable.") |
|
sys.exit(1) |
|
|
|
client = genai.Client(api_key=api_key, http_options={'api_version':'v1alpha'}) |
|
|
|
def upload_to_gemini(path, mime_type=None): |
|
"""Uploads the given file to Gemini. |
|
|
|
See https://ai.google.dev/gemini-api/docs/prompting_with_media |
|
""" |
|
print(f"Uploading file {path} to google…") |
|
file = client.files.upload(path=path) |
|
print(f"Uploaded file '{file.display_name}' as: {file.uri}") |
|
return file |
|
|
|
def wait_for_files_active(files): |
|
"""Waits for the given files to be active. |
|
|
|
Some files uploaded to the Gemini API need to be processed before they can be |
|
used as prompt inputs. The status can be seen by querying the file's "state" |
|
field. |
|
|
|
This implementation uses a simple blocking polling loop. Production code |
|
should probably employ a more sophisticated approach. |
|
""" |
|
print("Waiting for file processing...") |
|
for name in (file.name for file in files): |
|
file = client.files.get(name=name) |
|
while file.state == "PROCESSING": |
|
print(".", end="", flush=True) |
|
time.sleep(5) |
|
file = client.files.get(name=name) |
|
if file.state != "ACTIVE": |
|
raise Exception(f"File {file.name} failed to process") |
|
print("...all files ready") |
|
print() |
|
|
|
# Configuration for the transcription model (assuming gemini-pro or similar) |
|
transcription_generation_config = { |
|
"temperature": 1, |
|
"top_p": 0.95, |
|
"top_k": 40, |
|
"max_output_tokens": 16000, |
|
"response_mime_type": "text/plain", |
|
} |
|
|
|
# Configuration for the summarization model (gemini-2.0-flash-exp) |
|
summarization_generation_config = { |
|
"temperature": 1, |
|
"top_p": 0.95, |
|
"top_k": 40, |
|
"max_output_tokens": 1000, # Adjust max tokens for the summary |
|
"response_mime_type": "text/plain", |
|
} |
|
|
|
# Replace the command-line argument check with argparse |
|
parser = argparse.ArgumentParser(description='Transcribe and summarize audio/video files using Gemini API') |
|
parser.add_argument('file_path', help='Path to the audio/video file to process') |
|
parser.add_argument('--no-summary', action='store_true', help='Skip the summarization step') |
|
args = parser.parse_args() |
|
|
|
file_path = args.file_path |
|
|
|
def is_audio_file(mime_type): |
|
"""Check if the file is an audio file based on MIME type.""" |
|
return mime_type and mime_type.startswith('audio/') |
|
|
|
def extract_audio(video_path): |
|
"""Extract audio from video file and return path to audio file.""" |
|
print(f"Extracting audio from video file {video_path}...") |
|
|
|
# Create temporary file with .mp3 extension |
|
temp_dir = tempfile.gettempdir() |
|
temp_audio = os.path.join(temp_dir, "extracted_audio.mp3") |
|
|
|
try: |
|
video = VideoFileClip(video_path) |
|
video.audio.write_audiofile(temp_audio, logger=None) |
|
video.close() |
|
return temp_audio |
|
except Exception as e: |
|
print(f"Error extracting audio: {e}") |
|
sys.exit(1) |
|
|
|
# Guess the MIME type based on the file extension |
|
mime_type, _ = mimetypes.guess_type(file_path) |
|
if not is_audio_file(mime_type): |
|
if mime_type and mime_type.startswith('video/'): |
|
print("Video file detected, extracting audio...") |
|
file_path = extract_audio(file_path) |
|
mime_type = 'audio/mp3' |
|
else: |
|
print(f"Error: File '{file_path}' is not an audio or video file (MIME type: {mime_type})") |
|
print("This script only processes audio or video files.") |
|
sys.exit(1) |
|
|
|
files = [ |
|
upload_to_gemini(file_path, mime_type=mime_type), |
|
] |
|
|
|
# Some files have a processing delay. Wait for them to be ready. |
|
wait_for_files_active(files) |
|
|
|
# --- Transcription Step --- |
|
transcription_chat_session = client.chats.create( |
|
model="gemini-2.0-flash-exp", # Or the model you used for transcription |
|
config=types.GenerateContentConfig(**transcription_generation_config), |
|
history=[ |
|
{ |
|
"role": "user", |
|
"parts": [ |
|
types.Part.from_uri( |
|
file_uri=files[0].uri, |
|
mime_type=files[0].mime_type), |
|
], |
|
} |
|
] |
|
) |
|
|
|
print("Requesting transcription…") |
|
transcription_response = transcription_chat_session.send_message("Transcreva o conteúdo desse áudio") |
|
|
|
print(transcription_response.usage_metadata) |
|
print("\n--- Transcription Result ---\n\n") |
|
for line in textwrap.wrap(transcription_response.text, width=80): |
|
print(line) |
|
print("\n\n--- END Transcription ---\n\n") |
|
transcription_result = transcription_response.text |
|
|
|
# Modify the summarization section to be conditional |
|
if not args.no_summary: |
|
print("\nGenerating summary...") |
|
summarization_chat_session = client.chats.create( |
|
model="gemini-2.0-flash-thinking-exp", # Or the model you used for transcription |
|
config=types.GenerateContentConfig(**transcription_generation_config), |
|
history=[ |
|
{ |
|
"role": "user", |
|
"parts": [ |
|
{"text": transcription_result}, |
|
], |
|
} |
|
]) |
|
|
|
summarization_response = summarization_chat_session.send_message(f"Agora organiza as ideias num sumário") |
|
|
|
print("\n--- Summarization Result (Gemini 2.0 Flash Thinking Experimental) ---") |
|
print(f"{transcription_response.usage_metadata}\n\n") |
|
|
|
thoughts = [] |
|
response = [] |
|
|
|
for part in summarization_response.candidates[0].content.parts: |
|
if part.thought == True: |
|
thoughts.append(part) |
|
else: |
|
response.append(part) |
|
|
|
console = Console() |
|
# summary_text = summarization_response.candidates[0].content.parts[1].text |
|
summary_text = "".join(part.text for part in response) |
|
console.print(Markdown(f"--------\n{summary_text}\n--------\n")) |
|
pyperclip.copy(summary_text) |
|
print("\n(Summary copied to clipboard)") |
|
else: |
|
print("\n(Skipping summarization as requested)") |
|
pyperclip.copy(transcription_result) |
|
print("(Transcription copied to clipboard)") |