Skip to content

Instantly share code, notes, and snippets.

@indiejoseph
Created July 7, 2025 15:11
Show Gist options
  • Save indiejoseph/b7a4007a6ae1cd255ec3bbfcc5c40c6a to your computer and use it in GitHub Desktop.
Save indiejoseph/b7a4007a6ae1cd255ec3bbfcc5c40c6a to your computer and use it in GitHub Desktop.
Youtube API client
from dataclasses import dataclass
import yt_dlp
import librosa
import xml.etree.ElementTree as ET
import asyncio
from typing import List, Optional, Dict, Any
import httpx
import logging
import subprocess
import os
logging.basicConfig(level=logging.INFO)
# Language code map (static in Rust, module-level constant here)
LANGUAGE_CODE_MAP: Dict[str, str] = {
"zh-HK": "yue",
"yue-HK": "yue",
"en": "en",
"zh-CN": "zh",
"zh-TW": "zh",
"zh-Hant": "zh",
}
# --- Data Classes ---
@dataclass
class SimpleName:
simpleText: str
@dataclass
class CaptionTrack:
baseUrl: str
name: SimpleName
vssId: str
languageCode: str
isTranslatable: bool
trackName: str
kind: Optional[str] = None
asr: Optional[str] = None
@dataclass
class PlayerCaptionsTracklistRenderer:
captionTracks: List[CaptionTrack]
audioTracks: Any
translationLanguages: Any
defaultAudioTrackIndex: int
@dataclass
class Captions:
playerCaptionsTracklistRenderer: PlayerCaptionsTracklistRenderer
@dataclass
class VideoDetails:
videoId: str
title: str
lengthSeconds: str
isOwnerViewing: bool
channelId: str
shortDescription: str
viewCount: str
author: str
isPrivate: bool
isLiveContent: bool
isCrawlable: bool
thumbnail: Dict[str, Any] # Assuming thumbnail is a dict with various sizes
allowRatings: bool
isUnpluggedCorpus: bool
keywords: List[str]
@dataclass
class YouTubeResponse:
responseContext: Any
playabilityStatus: Any
streamingData: Any
playerConfig: Any
videoDetails: VideoDetails
microformat: Any
cards: Any
trackingParams: str
captions: Optional[Captions]
@dataclass
class Client:
hl: str
clientName: str
clientVersion: str
@dataclass
class Request:
useSsl: bool
@dataclass
class Context:
client: Client
request: Request
@dataclass
class VideoInfoRequest:
videoId: str
context: Context
# --- SRT Conversion ---
class Srt:
@staticmethod
def convert_xml_to_srt(
xml: str,
language_code: str,
dual_xml=None,
dual_subtitles_code=None,
just_text=False,
) -> str:
binding = LANGUAGE_CODE_MAP
language_code_conv = binding.get(language_code, language_code)
dual_language_code = None
dual_transcript = []
# Parse dual XML if present
if dual_xml:
if dual_subtitles_code:
dual_language_code = binding.get(
dual_subtitles_code, dual_subtitles_code
)
dual_root = ET.fromstring(dual_xml)
for node in dual_root.findall(".//text"):
start = node.attrib.get("start", "0")
dur = node.attrib.get("dur", "0")
dual_text = (node.text or "").replace("\n", " ").strip()
dual_transcript.append((start, dur, dual_text))
# Parse main XML
root = ET.fromstring(xml)
transcript = []
for node in root.findall(".//text"):
start = node.attrib.get("start", "0")
dur = node.attrib.get("dur", "0")
text = (node.text or "").replace("\n", " ").strip()
transcript.append((start, dur, text))
def secs_to_srt_timestamp(seconds: float) -> str:
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = int(seconds % 60)
millis = int(round((seconds - int(seconds)) * 1000))
return f"{hours:02}:{minutes:02}:{secs:02},{millis:03}"
result = []
for idx, (start, dur, text) in enumerate(transcript):
start_secs = float(start)
dur_secs = float(dur)
end_secs = start_secs + dur_secs
start_time = secs_to_srt_timestamp(start_secs)
end_time = secs_to_srt_timestamp(end_secs)
language_prefix = f"({language_code_conv})"
combined_text = f"{language_prefix}{text}"
if idx < len(dual_transcript):
_, _, dual_text = dual_transcript[idx]
if dual_language_code:
dual_language_prefix = f"\n({dual_language_code})"
combined_text += f"{dual_language_prefix}{dual_text}"
if just_text:
result.append(f"{combined_text}\n")
else:
result.append(
f"{idx+1}\n{start_time} --> {end_time}\n{combined_text}\n"
)
return "\n".join(result)
# --- Main Client ---
class Youtube:
API_URL = "https://www.youtube.com/youtubei/v1/player"
CLIENT_VERSION = "2.20241107.11.00"
def __init__(self, ffmpeg_location="/usr/bin/ffmpeg"):
self.ffmpeg_location = ffmpeg_location
@staticmethod
async def retrieve_video_client(video_id: str) -> YouTubeResponse:
logging.info("Retrieving video info")
body = {
"videoId": video_id,
"context": {
"client": {
"hl": "en",
"clientName": "WEB",
"clientVersion": Youtube.CLIENT_VERSION,
},
"request": {"useSsl": True},
},
}
async with httpx.AsyncClient() as client:
resp = await client.post(Youtube.API_URL, json=body)
if resp.status_code != 200:
logging.error("Failed to retrieve video info")
raise Exception(f"Failed to retrieve video info: {resp.text}")
data = resp.json()
# For brevity, not all nested parsing implemented; you can use dacite or Pydantic for full parsing
video_details = VideoDetails(**data["videoDetails"])
captions = None
if "captions" in data and data["captions"]:
captions = Captions(
playerCaptionsTracklistRenderer=PlayerCaptionsTracklistRenderer(
captionTracks=[
CaptionTrack(**ct)
for ct in data["captions"][
"playerCaptionsTracklistRenderer"
]["captionTracks"]
],
audioTracks=[],
translationLanguages=[],
defaultAudioTrackIndex=data["captions"][
"playerCaptionsTracklistRenderer"
].get("defaultAudioTrackIndex", 0),
)
)
return YouTubeResponse(
responseContext=data.get("responseContext"),
playabilityStatus=data.get("playabilityStatus"),
streamingData=data.get("streamingData"),
playerConfig=data.get("playerConfig"),
videoDetails=video_details,
microformat=data.get("microformat"),
cards=data.get("cards"),
trackingParams=data.get("trackingParams", ""),
captions=captions,
)
@staticmethod
async def retrieve_video_details(video_id: str) -> VideoDetails:
video_info = await Youtube.retrieve_video_client(video_id)
return video_info.videoDetails
@staticmethod
async def retrieve_text_from_captions(video_id: str, language_code: str) -> str:
video_info = await Youtube.retrieve_video_client(video_id)
if not video_info.captions:
raise Exception("No captions found")
captions = video_info.captions.playerCaptionsTracklistRenderer.captionTracks
xml_subtitle = ""
for caption in captions:
converted_language_code = LANGUAGE_CODE_MAP.get(
caption.languageCode, caption.languageCode
)
if language_code == converted_language_code and caption.asr is None:
xml_subtitle = await Youtube.retrieve_subtitle_from_url(caption.baseUrl)
if not xml_subtitle:
logging.warning(
f"No subtitles found for video {video_id} in language {language_code}"
)
return ""
return Srt.convert_xml_to_srt(xml_subtitle, language_code=language_code)
@staticmethod
async def retrieve_subtitle_from_url(url: str) -> str:
async with httpx.AsyncClient() as client:
resp = await client.get(url)
resp.raise_for_status()
return resp.text
@staticmethod
async def download_audio_file(video_id: str, ffmpeg_location="/usr/bin/ffmpeg") -> str:
path = f"./downloads/{video_id}.mp3"
if os.path.exists(path):
os.remove(path)
full_url = f"https://www.youtube.com/watch?v={video_id}"
ydl_opts = {
"extract_audio": True,
"format": "bestaudio/best",
"outtmpl": path.replace(".mp3", ".%(ext)s"),
"postprocessors": [
{ # Extract audio using ffmpeg
"key": "FFmpegExtractAudio",
"preferredcodec": "mp3",
"preferredquality": "192",
}
],
"ffmpeg_location": ffmpeg_location, # Adjust path as needed
"postprocessor_args": [
"-ar",
"24000", # Convert to 24kHz sample rate
],
}
try:
with yt_dlp.YoutubeDL(ydl_opts) as video:
video.download(full_url)
audio, sr = librosa.load(path, sr=None)
# duration less than 30 seconds
if len(audio) < sr * 30:
logging.warning(
f"Audio duration for {video_id} is less than 30 seconds."
)
raise ValueError("Audio duration is too short.")
# remove the file after loading
os.remove(path)
return audio, sr
except Exception as e:
logging.error(f"Failed to download or process audio: {e}")
raise
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment