Created
November 6, 2024 20:10
-
-
Save pmarkun/1d5644dc4209ffe889ead2b03632b9d2 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import json | |
from datetime import datetime | |
from pytubefix import YouTube | |
import whisper | |
from openai import OpenAI | |
from dotenv import load_dotenv | |
import sys | |
# Carregar variáveis de ambiente do arquivo .env | |
load_dotenv() | |
API_KEY = os.getenv("OPENAI_API_KEY") | |
MEDIA_FOLDER = os.getenv("MEDIA_FOLDER", "media") # Pasta padrão 'media' se não configurado no .env | |
class VideoProcessor: | |
def __init__(self, url, force=False): | |
self.url = url | |
self.force = force | |
yt = YouTube(self.url) | |
self.video_id = yt.video_id | |
self.title = yt.title | |
self.today = yt.publish_date.strftime("%Y-%m-%d") | |
self.client = OpenAI(api_key=API_KEY) | |
self.media_folder = self.create_media_folder() | |
def create_media_folder(self): | |
folder = os.path.join(MEDIA_FOLDER, self.today) | |
os.makedirs(folder, exist_ok=True) | |
return folder | |
def download_video(self): | |
video_path = os.path.join(self.media_folder, f"{self.video_id}.mp4") | |
if os.path.exists(video_path) and not self.force: | |
print(f"[DEBUG] Vídeo já existe: {video_path}") | |
return video_path | |
print("[DEBUG] Baixando vídeo...") | |
yt = YouTube(self.url) | |
return yt.streams.filter(only_audio=True).first().download(output_path=self.media_folder, filename=f"{self.video_id}.mp4") | |
def transcribe_audio(self, file_path): | |
transcript_path = os.path.join(self.media_folder, f"transcript_{self.video_id}.txt") | |
if os.path.exists(transcript_path) and not self.force: | |
print(f"[DEBUG] Transcrição já existe: {transcript_path}") | |
with open(transcript_path, "r", encoding="utf-8") as file: | |
return transcript_path | |
print("[DEBUG] Transcrevendo áudio...") | |
model = whisper.load_model("base") | |
result = model.transcribe(file_path, verbose=False, fp16=False) | |
transcript = result['text'] | |
with open(transcript_path, "w", encoding="utf-8") as file: | |
file.write(transcript) | |
return transcript_path | |
def summarize_large_transcript(self, transcript_path, max_tokens=12000): | |
summary_path = os.path.join(self.media_folder, f"bigsummary_{self.video_id}.txt") | |
if os.path.exists(summary_path) and not self.force: | |
print(f"[DEBUG] Sumário completo já existe: {summary_path}") | |
with open(summary_path, "r", encoding="utf-8") as file: | |
return summary_path | |
print("[DEBUG] Resumindo transcrição...") | |
with open(transcript_path, "r", encoding="utf-8") as file: | |
transcript = file.read() | |
chunks = self.split_text_into_chunks(transcript, max_tokens) | |
summaries = [self.call_openai_summary(chunk, "Resuma a transcrição abaixo:", max_tokens) for chunk in chunks] | |
combined_summary = " ".join(summaries) | |
with open(summary_path, "w", encoding="utf-8") as file: | |
file.write(combined_summary) | |
return summary_path | |
def summarize_for_whatsapp(self, big_summary_path, video_title): | |
whatsapp_summary_path = os.path.join(self.media_folder, f"smallsummary_{self.video_id}.md") | |
if os.path.exists(whatsapp_summary_path) and not self.force: | |
print(f"[DEBUG] Sumário WhatsApp já existe: {whatsapp_summary_path}") | |
with open(whatsapp_summary_path, "r", encoding="utf-8") as file: | |
return whatsapp_summary_path | |
print("[DEBUG] Criando resumo para WhatsApp...") | |
sys_prompt = "Você é Lex, uma inteligência artificial legislativa feita para explicar a política para o cidadão. Você é bem humorada e crítica, e acompanha sempre as reuniões da Câmara Municipal de São Paulo." | |
prompt = f"""Abaixo está um resumo da transcrição do vídeo '{video_title}' composta pelas falas de diferentes vereadores. | |
Crie uma mensagem de WhatsApp com um breve resumo imparcial que destaca os fatos mais relevantes da sessão, com ênfase nas decisões políticas tomadas na reunião para informar o cidadão. | |
Algumas regras: | |
1) Quando mencionar um projeto legislativo, explique o assunto do projeto e identifique o tipo, número e ano no formato: ([tipo] [numero]/[ano]) ex: (PL 123/2024) | |
2) Procure sempre refereciar o vereador pelo nome, ao invés de só colocar 'um vereador', 'alguém'. | |
3) Evite mensagens genéricas como 'a reunião foi produtiva', 'houve muita discussão', 'a sessão foi longa'. | |
Retorne apenas a mensagem com os devidos emojis.""" | |
with open(big_summary_path, "r", encoding="utf-8") as file: | |
summary = file.read() | |
whatsapp_summary = self.call_openai_summary(prompt + "\n\n" + summary, sys_prompt) | |
with open(whatsapp_summary_path, "w", encoding="utf-8") as file: | |
file.write(whatsapp_summary) | |
return whatsapp_summary_path | |
def call_openai_summary(self, text, prompt, max_tokens=3000): | |
response = self.client.chat.completions.create( | |
model="gpt-4o-mini", | |
messages=[ | |
{"role": "system", "content": f"{prompt}"}, | |
{"role": "user", "content": f"{text}"} | |
], | |
temperature=0.7, | |
max_tokens=max_tokens, | |
) | |
return response.choices[0].message.content.strip() | |
def split_text_into_chunks(self, text, max_tokens): | |
words = text.split() | |
chunks, current_chunk, current_length = [], [], 0 | |
for word in words: | |
if current_length + len(word) + 1 > max_tokens: | |
chunks.append(" ".join(current_chunk)) | |
current_chunk, current_length = [], 0 | |
current_chunk.append(word) | |
current_length += len(word) + 1 | |
if current_chunk: | |
chunks.append(" ".join(current_chunk)) | |
return chunks | |
def update_media_json(self, title, url, transcript_path, whatsapp_path): | |
media_json_path = os.path.join(MEDIA_FOLDER, "media.json") | |
media_data = { | |
"date": self.today, | |
"title": title, | |
"url": url, | |
"transcript": transcript_path, | |
"whatsapp": whatsapp_path | |
} | |
if os.path.exists(media_json_path): | |
with open(media_json_path, "r", encoding="utf-8") as file: | |
data = json.load(file) | |
else: | |
data = [] | |
#checa se a url já existe no arquivo | |
for media in data: | |
if media["url"] == url: | |
print(f"[DEBUG] URL já existe no arquivo: {media_json_path}") | |
return | |
data.append(media_data) | |
with open(media_json_path, "w", encoding="utf-8") as file: | |
json.dump(data, file, indent=4) | |
def process(self): | |
video_file = self.download_video() | |
transcript_path = self.transcribe_audio(video_file) | |
big_summary_path = self.summarize_large_transcript(transcript_path) | |
whatsapp_summary_path = self.summarize_for_whatsapp(big_summary_path, self.title) | |
self.update_media_json(self.title, self.url, transcript_path, whatsapp_summary_path) | |
# Executa o script via linha de comando | |
if __name__ == "__main__": | |
force = "--force" in sys.argv | |
url_index = 1 if not force else 2 | |
if len(sys.argv) <= url_index: | |
print("Uso: python script.py <URL do vídeo> [--force]") | |
else: | |
processor = VideoProcessor(sys.argv[url_index], force=force) | |
processor.process() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment