Skip to content

Instantly share code, notes, and snippets.

@etrobot
Created January 11, 2025 12:24
Show Gist options
  • Save etrobot/4bcc09206ebb487278d83befda025308 to your computer and use it in GitHub Desktop.
Save etrobot/4bcc09206ebb487278d83befda025308 to your computer and use it in GitHub Desktop.
search_youtube_n_summarize.py
import os,json,time
import openai
from cookie_loader import save_cookies_as_Netscape
import webvtt
from yt_dlp import YoutubeDL
PROXY='http://127.0.0.1:7890'
def get_llm_config(scheme='openai'):
apikey=os.getenv("OPENAI_API_KEY")
base_url=os.getenv("OPENAI_BASE_URL")
model='gpt-4o-mini'
if scheme == 'siliconflow':
apikey=os.getenv("SILICONFLOW_API_KEY")
base_url=os.getenv("SILICONFLOW_BASE_URL")
model=os.getenv("LLM_MODEL")
return apikey,base_url,model
def llm_gen_json(llm:openai.Client,model:str,query:str,format:dict,debug=False,max_retries:int=20)->dict:
prompt= f"\noutput in json format :\n{str(format)}\n"
retry=max_retries
while retry>0:
try:
llm_response = llm.chat.completions.create(
model=model,
messages=[{"role": "user", "content": query+prompt}],
response_format={ "type": "json_object" }
)
result=json.loads(llm_response.choices[0].message.content)
if not isinstance(result, dict):
if isinstance(result, list) and len(result)>0 and isinstance(result[0], dict):
result = result[0]
else:
print('error', f"Invalid action received, will retry\n{result}\n")
continue
if not all(k in result for k in format):
print('error', f"Invalid action received, will retry\n{result}\n")
continue
return result
except Exception as e:
print('error', e)
time.sleep((max_retries-retry)*10)
retry-=1
continue
return None
def search_videos(keyword, max_results=5, proxy=None, cookies=None):
"""
搜索YouTube视频
:param keyword: 搜索关键词
:param max_results: 最大搜索结果数量
:param proxy: 代理服务器地址
:param cookies: cookies文件路径
:return: 搜索结果列表
"""
ydl_opts = {
'quiet': True,
'no_warnings': True,
'extract_flat': True,
'force_generic_extractor': True
}
if proxy:
ydl_opts['proxy'] = proxy
if cookies:
ydl_opts['cookiefile'] = cookies
videos = []
with YoutubeDL(ydl_opts) as ydl:
try:
search_url = f"ytsearch{max_results}:{keyword}"
search_results = ydl.extract_info(search_url, download=False)
if not search_results.get('entries'):
print("未找到相关视频")
return videos
for entry in search_results['entries']:
if entry.get('url'):
videos.append({
'title': entry.get('title'),
'intro': entry.get('description'),
'id': entry.get('id'),
'url': entry.get('url')
})
print(f"\n找到视频: {entry.get('title')}")
print(f"URL: https://youtube.com/watch?v={entry.get('id')}")
return videos
except Exception as e:
print(f"搜索时出错: {str(e)}")
return videos
def download_subs(video_info, proxy=None, cookies=None, filename=None, if_download_video=False):
"""
下载视频的自动生成字幕并转换为SRT格式
:param video_info: 视频信息字典 or URL string
:param proxy: 代理服务器地址
:param cookies: cookies文件路径
:param filename: 输出文件名
:param if_download_video: 是否同时下载视频
"""
# Handle both dictionary and URL string inputs
if isinstance(video_info, str):
video_url = video_info
video_id = video_url.split('v=')[-1] # Extract video ID from URL
else:
video_url = video_info['url']
video_id = video_info['id']
if filename is None:
filename = video_id # Use video_id instead of title for consistency
else:
filename = filename.replace(' ', '_').replace('*', 'x')[:100]
if os.path.isfile(f'youtube/{filename}.en.srt'):
print(f'{filename} 字幕已存在')
return filename
subtitle_opts = {
'skip_download': not if_download_video,
'writeautomaticsub': True,
'subtitleslangs': ['en'],
'outtmpl': f'youtube/{filename}.%(ext)s',
'paths': {
'home': '.'
}
}
if proxy:
subtitle_opts['proxy'] = proxy
if cookies:
subtitle_opts['cookiefile'] = cookies
with YoutubeDL(subtitle_opts) as ydl_subs:
try:
if not os.path.exists(f'youtube/{filename}.en.vtt'):
ydl_subs.download([video_url])
# Convert VTT to SRT and get content
subtitle_content = convert_vtt_to_srt(f'youtube/{filename}.en.vtt', f'youtube/{filename}.en.srt')
if subtitle_content: # Add check for None
print(f"视频字幕下载并转换成功")
return filename
else:
print("无法获取字幕内容")
except Exception as e:
print(f"下载或转换字幕时出错: {str(e)}")
def convert_vtt_to_srt(vtt_file, srt_file):
"""
将VTT格式字幕转换为SRT格式
:param vtt_file: VTT文件路径
:param srt_file: 输出的SRT文件路径
"""
try:
# 读取VTT并转换为SRT
vtt = webvtt.read(vtt_file)
subtitle_content = ''
with open(srt_file, 'w', encoding='utf-8') as f:
for i, caption in enumerate(vtt.captions, 1):
f.write(f"{i}\n")
f.write(f"{caption.start.replace('.', ',')} --> {caption.end.replace('.', ',')}\n")
f.write(f"{caption.text}\n\n")
subtitle_content += f"{caption.text}\n\n"
# 删除原VTT文件
import os
os.remove(vtt_file)
return subtitle_content
except Exception as e:
print(f"转换字幕格式时出错: {str(e)}")
if __name__ == "__main__":
cookies_file = save_cookies_as_Netscape('cookies/youtube.txt', 'youtube.com')
target='Methods to make niches video,including finding niches and making video'
videos = search_videos("AI", 5, PROXY, cookies_file)
evaluate=[]
for video in videos:
filename = download_subs(video,cookies_file,if_download_video=True,proxy=PROXY)
subtitle_content=open(f'youtube/{filename}.en.srt','r').read()
api_key, base_url, model = get_llm_config('siliconflow')
model = 'deepseek-ai/DeepSeek-V2.5'
client = openai.Client(api_key=api_key, base_url=base_url)
format={
'related_points':['point1','point2','point3',...],
}
evalute_json=llm_gen_json(client,model,f'video title: {video['title']} \n\n video[intro]: {video['intro']} \n\n auto-gent-subtitle: {subtitle_content}\n\nfrom this video, find out points related to '+target,format)
video['evalute_json']=evalute_json
print(video['title'])
print(evalute_json)
evaluate.append(video)
#sort by the related_points
evaluate.sort(key=lambda x: len(x['related_points']), reverse=True)
for video in evaluate:
print(video['title'],len(video['related_points']))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment