Created
July 7, 2025 15:11
-
-
Save indiejoseph/b7a4007a6ae1cd255ec3bbfcc5c40c6a to your computer and use it in GitHub Desktop.
Youtube API client
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from dataclasses import dataclass | |
import yt_dlp | |
import librosa | |
import xml.etree.ElementTree as ET | |
import asyncio | |
from typing import List, Optional, Dict, Any | |
import httpx | |
import logging | |
import subprocess | |
import os | |
logging.basicConfig(level=logging.INFO) | |
# Language code map (static in Rust, module-level constant here) | |
LANGUAGE_CODE_MAP: Dict[str, str] = { | |
"zh-HK": "yue", | |
"yue-HK": "yue", | |
"en": "en", | |
"zh-CN": "zh", | |
"zh-TW": "zh", | |
"zh-Hant": "zh", | |
} | |
# --- Data Classes --- | |
@dataclass | |
class SimpleName: | |
simpleText: str | |
@dataclass | |
class CaptionTrack: | |
baseUrl: str | |
name: SimpleName | |
vssId: str | |
languageCode: str | |
isTranslatable: bool | |
trackName: str | |
kind: Optional[str] = None | |
asr: Optional[str] = None | |
@dataclass | |
class PlayerCaptionsTracklistRenderer: | |
captionTracks: List[CaptionTrack] | |
audioTracks: Any | |
translationLanguages: Any | |
defaultAudioTrackIndex: int | |
@dataclass | |
class Captions: | |
playerCaptionsTracklistRenderer: PlayerCaptionsTracklistRenderer | |
@dataclass | |
class VideoDetails: | |
videoId: str | |
title: str | |
lengthSeconds: str | |
isOwnerViewing: bool | |
channelId: str | |
shortDescription: str | |
viewCount: str | |
author: str | |
isPrivate: bool | |
isLiveContent: bool | |
isCrawlable: bool | |
thumbnail: Dict[str, Any] # Assuming thumbnail is a dict with various sizes | |
allowRatings: bool | |
isUnpluggedCorpus: bool | |
keywords: List[str] | |
@dataclass | |
class YouTubeResponse: | |
responseContext: Any | |
playabilityStatus: Any | |
streamingData: Any | |
playerConfig: Any | |
videoDetails: VideoDetails | |
microformat: Any | |
cards: Any | |
trackingParams: str | |
captions: Optional[Captions] | |
@dataclass | |
class Client: | |
hl: str | |
clientName: str | |
clientVersion: str | |
@dataclass | |
class Request: | |
useSsl: bool | |
@dataclass | |
class Context: | |
client: Client | |
request: Request | |
@dataclass | |
class VideoInfoRequest: | |
videoId: str | |
context: Context | |
# --- SRT Conversion --- | |
class Srt: | |
@staticmethod | |
def convert_xml_to_srt( | |
xml: str, | |
language_code: str, | |
dual_xml=None, | |
dual_subtitles_code=None, | |
just_text=False, | |
) -> str: | |
binding = LANGUAGE_CODE_MAP | |
language_code_conv = binding.get(language_code, language_code) | |
dual_language_code = None | |
dual_transcript = [] | |
# Parse dual XML if present | |
if dual_xml: | |
if dual_subtitles_code: | |
dual_language_code = binding.get( | |
dual_subtitles_code, dual_subtitles_code | |
) | |
dual_root = ET.fromstring(dual_xml) | |
for node in dual_root.findall(".//text"): | |
start = node.attrib.get("start", "0") | |
dur = node.attrib.get("dur", "0") | |
dual_text = (node.text or "").replace("\n", " ").strip() | |
dual_transcript.append((start, dur, dual_text)) | |
# Parse main XML | |
root = ET.fromstring(xml) | |
transcript = [] | |
for node in root.findall(".//text"): | |
start = node.attrib.get("start", "0") | |
dur = node.attrib.get("dur", "0") | |
text = (node.text or "").replace("\n", " ").strip() | |
transcript.append((start, dur, text)) | |
def secs_to_srt_timestamp(seconds: float) -> str: | |
hours = int(seconds // 3600) | |
minutes = int((seconds % 3600) // 60) | |
secs = int(seconds % 60) | |
millis = int(round((seconds - int(seconds)) * 1000)) | |
return f"{hours:02}:{minutes:02}:{secs:02},{millis:03}" | |
result = [] | |
for idx, (start, dur, text) in enumerate(transcript): | |
start_secs = float(start) | |
dur_secs = float(dur) | |
end_secs = start_secs + dur_secs | |
start_time = secs_to_srt_timestamp(start_secs) | |
end_time = secs_to_srt_timestamp(end_secs) | |
language_prefix = f"({language_code_conv})" | |
combined_text = f"{language_prefix}{text}" | |
if idx < len(dual_transcript): | |
_, _, dual_text = dual_transcript[idx] | |
if dual_language_code: | |
dual_language_prefix = f"\n({dual_language_code})" | |
combined_text += f"{dual_language_prefix}{dual_text}" | |
if just_text: | |
result.append(f"{combined_text}\n") | |
else: | |
result.append( | |
f"{idx+1}\n{start_time} --> {end_time}\n{combined_text}\n" | |
) | |
return "\n".join(result) | |
# --- Main Client --- | |
class Youtube: | |
API_URL = "https://www.youtube.com/youtubei/v1/player" | |
CLIENT_VERSION = "2.20241107.11.00" | |
def __init__(self, ffmpeg_location="/usr/bin/ffmpeg"): | |
self.ffmpeg_location = ffmpeg_location | |
@staticmethod | |
async def retrieve_video_client(video_id: str) -> YouTubeResponse: | |
logging.info("Retrieving video info") | |
body = { | |
"videoId": video_id, | |
"context": { | |
"client": { | |
"hl": "en", | |
"clientName": "WEB", | |
"clientVersion": Youtube.CLIENT_VERSION, | |
}, | |
"request": {"useSsl": True}, | |
}, | |
} | |
async with httpx.AsyncClient() as client: | |
resp = await client.post(Youtube.API_URL, json=body) | |
if resp.status_code != 200: | |
logging.error("Failed to retrieve video info") | |
raise Exception(f"Failed to retrieve video info: {resp.text}") | |
data = resp.json() | |
# For brevity, not all nested parsing implemented; you can use dacite or Pydantic for full parsing | |
video_details = VideoDetails(**data["videoDetails"]) | |
captions = None | |
if "captions" in data and data["captions"]: | |
captions = Captions( | |
playerCaptionsTracklistRenderer=PlayerCaptionsTracklistRenderer( | |
captionTracks=[ | |
CaptionTrack(**ct) | |
for ct in data["captions"][ | |
"playerCaptionsTracklistRenderer" | |
]["captionTracks"] | |
], | |
audioTracks=[], | |
translationLanguages=[], | |
defaultAudioTrackIndex=data["captions"][ | |
"playerCaptionsTracklistRenderer" | |
].get("defaultAudioTrackIndex", 0), | |
) | |
) | |
return YouTubeResponse( | |
responseContext=data.get("responseContext"), | |
playabilityStatus=data.get("playabilityStatus"), | |
streamingData=data.get("streamingData"), | |
playerConfig=data.get("playerConfig"), | |
videoDetails=video_details, | |
microformat=data.get("microformat"), | |
cards=data.get("cards"), | |
trackingParams=data.get("trackingParams", ""), | |
captions=captions, | |
) | |
@staticmethod | |
async def retrieve_video_details(video_id: str) -> VideoDetails: | |
video_info = await Youtube.retrieve_video_client(video_id) | |
return video_info.videoDetails | |
@staticmethod | |
async def retrieve_text_from_captions(video_id: str, language_code: str) -> str: | |
video_info = await Youtube.retrieve_video_client(video_id) | |
if not video_info.captions: | |
raise Exception("No captions found") | |
captions = video_info.captions.playerCaptionsTracklistRenderer.captionTracks | |
xml_subtitle = "" | |
for caption in captions: | |
converted_language_code = LANGUAGE_CODE_MAP.get( | |
caption.languageCode, caption.languageCode | |
) | |
if language_code == converted_language_code and caption.asr is None: | |
xml_subtitle = await Youtube.retrieve_subtitle_from_url(caption.baseUrl) | |
if not xml_subtitle: | |
logging.warning( | |
f"No subtitles found for video {video_id} in language {language_code}" | |
) | |
return "" | |
return Srt.convert_xml_to_srt(xml_subtitle, language_code=language_code) | |
@staticmethod | |
async def retrieve_subtitle_from_url(url: str) -> str: | |
async with httpx.AsyncClient() as client: | |
resp = await client.get(url) | |
resp.raise_for_status() | |
return resp.text | |
@staticmethod | |
async def download_audio_file(video_id: str, ffmpeg_location="/usr/bin/ffmpeg") -> str: | |
path = f"./downloads/{video_id}.mp3" | |
if os.path.exists(path): | |
os.remove(path) | |
full_url = f"https://www.youtube.com/watch?v={video_id}" | |
ydl_opts = { | |
"extract_audio": True, | |
"format": "bestaudio/best", | |
"outtmpl": path.replace(".mp3", ".%(ext)s"), | |
"postprocessors": [ | |
{ # Extract audio using ffmpeg | |
"key": "FFmpegExtractAudio", | |
"preferredcodec": "mp3", | |
"preferredquality": "192", | |
} | |
], | |
"ffmpeg_location": ffmpeg_location, # Adjust path as needed | |
"postprocessor_args": [ | |
"-ar", | |
"24000", # Convert to 24kHz sample rate | |
], | |
} | |
try: | |
with yt_dlp.YoutubeDL(ydl_opts) as video: | |
video.download(full_url) | |
audio, sr = librosa.load(path, sr=None) | |
# duration less than 30 seconds | |
if len(audio) < sr * 30: | |
logging.warning( | |
f"Audio duration for {video_id} is less than 30 seconds." | |
) | |
raise ValueError("Audio duration is too short.") | |
# remove the file after loading | |
os.remove(path) | |
return audio, sr | |
except Exception as e: | |
logging.error(f"Failed to download or process audio: {e}") | |
raise |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment