Created
July 17, 2021 19:56
-
-
Save rebane2001/a8fdc9c25359a26b9b17a3123e293447 to your computer and use it in GitHub Desktop.
Somewhat hastily written script to pull metadata for every video in an archive (recommended to cronjob to every day)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
import re | |
import os | |
import time | |
import json | |
from datetime import datetime, timezone | |
import random | |
# Put your YouTube API keys here, the more the merrier (one key can do 500k vids a day) | |
youtube_api_keys = ["YOUR_API_KEYS_HERE"] | |
# Get todays date as string | |
datestr = datetime.today().strftime('%Y-%m-%d') | |
# Output paths | |
jsonlpath = f"daily_data/{datestr}/{datestr}.jsonl" | |
logpath = f"daily_data/{datestr}/{datestr}.log" | |
# Links | |
video_id_sources = ["/path/to/links.txt"] | |
error_retries = 10 | |
def extractVids(link): | |
matches = [] | |
vidmatch = re.findall(r'/watch\?v=([A-Za-z0-9_\-]{11})', link) | |
if vidmatch: | |
matches.extend(vidmatch) | |
vidmatch = re.findall(r'&v=([A-Za-z0-9_\-]{11})', link) | |
if vidmatch: | |
matches.extend(vidmatch) | |
vidmatch = re.findall(r'youtu.be/([A-Za-z0-9_\-]{11})', link) | |
if vidmatch: | |
matches.extend(vidmatch) | |
vidmatch = re.findall(r'/shorts/([A-Za-z0-9_\-]{11})', link) | |
if vidmatch: | |
matches.extend(vidmatch) | |
vidmatch = re.findall(r'/embed/([A-Za-z0-9_\-]{11})', link) | |
if vidmatch: | |
matches.extend(vidmatch) | |
# archive.txt format | |
vidmatch = re.findall(r'youtube ([A-Za-z0-9_\-]{11})', link) | |
if vidmatch: | |
matches.extend(vidmatch) | |
return matches | |
def extractPlaylists(link): | |
matches = [] | |
vidmatch = re.findall(r'/playlist\?list=([A-Za-z0-9_\-]{16,64})', link) | |
if vidmatch: | |
matches.extend(vidmatch) | |
vidmatch = re.findall(r'/playlist\?.*?&list=([A-Za-z0-9_\-]{16,64})', link) | |
if vidmatch: | |
matches.extend(vidmatch) | |
return matches | |
# Some snippets borrowed from https://github.com/itallreturnstonothing/panicpony/ | |
def get_playlists_page(playlist_id, page_token=None): | |
response = requests.get( | |
( | |
f'https://www.googleapis.com/youtube/v3/playlistItems?' | |
f'playlistId={playlist_id}' | |
f'&part=status,snippet,contentDetails' | |
f'&maxResults=50' | |
f'{"&pageToken=" + page_token if page_token else ""}' | |
f'&key={random.choice(youtube_api_keys)}' | |
) | |
) | |
if not response.status_code == 200: | |
print("Something not right!") | |
print(playlist_id) | |
return (None, None) | |
precious_data = json.loads(response.text) | |
return ( | |
precious_data["items"], | |
precious_data["nextPageToken"] if "nextPageToken" in precious_data else None | |
) | |
def get_videos_page(video_ids): | |
response = requests.get( | |
( | |
f'https://www.googleapis.com/youtube/v3/videos?' | |
f'id={",".join(video_ids)}' | |
f'&part=contentDetails,id,liveStreamingDetails,localizations,player,recordingDetails,snippet,statistics,status,topicDetails' | |
f'&maxResults=50' | |
f'&key={random.choice(youtube_api_keys)}' | |
) | |
) | |
if not response.status_code == 200: | |
print("Something not right!") | |
return None | |
precious_data = json.loads(response.text) | |
return precious_data["items"] | |
pl_page = 0 | |
def get_all_videos_from_playlist(playlist_id): | |
global pl_page | |
pl_page = 1 | |
writeLog(f"Fetching playlist {playlist_id}") | |
(first_videos, next_page) = get_playlists_page(playlist_id) | |
def amazing(next_page): | |
global pl_page | |
while next_page: | |
pl_page+=1 | |
writeLog(f"Fetching playlist (page {pl_page}, {pl_page*50} videos)") | |
next_videos, next_page = get_playlists_page(playlist_id,next_page) | |
yield next_videos | |
return [x for flatten_list in [first_videos] + list(amazing(next_page)) for x in flatten_list] | |
def getVideoIds(): | |
videoids = set() | |
for filename in video_id_sources: | |
print(filename) | |
with open(filename, "r") as f: | |
text = f.read() | |
for playlist in extractPlaylists(text): | |
for i in range(error_retries): | |
try: | |
videos = get_all_videos_from_playlist(playlist) | |
for vid in videos: | |
videoids.add(vid["snippet"]["resourceId"]["videoId"]) | |
break | |
except Exception as e: | |
if i < error_retries-1: | |
print(repr(e)) | |
print(f"Retry attempt {i+2} of {error_retries}...") | |
else: | |
writeLog(f"Playlist {playlist} failed with error {repr(e)}") | |
videoids = videoids[50:] | |
for videoid in extractVids(text): | |
videoids.add(videoid) | |
return videoids | |
# Write timestamped log to file and console | |
def writeLog(message): | |
with open(logpath, "a") as f: | |
msg = f"[{str(datetime.utcnow().replace(microsecond=0).isoformat())}] {message}" | |
f.write(f"{msg}\n") | |
print(msg) | |
# Main stuff | |
def downloadMetadata(videoids): | |
orig_len = len(videoids) | |
while len(videoids) > 0: | |
print(f"{round(((orig_len-len(videoids))/orig_len)*1000)/10}% ({orig_len-len(videoids)}/{orig_len})") | |
for i in range(error_retries): | |
try: | |
with open(jsonlpath, "a") as f: | |
for video in get_videos_page(videoids[:50]): | |
f.write(f"{json.dumps(video)}\n") | |
videoids = videoids[50:] | |
break | |
except Exception as e: | |
if i < error_retries-1: | |
print(repr(e)) | |
print(f"Retry attempt {i+2} of {error_retries}...") | |
else: | |
writeLog(f"Batch {videoids[:50]} failed with error {repr(e)}") | |
videoids = videoids[50:] | |
print(f"100% ({orig_len}/{orig_len})") | |
def main(): | |
if os.path.isdir(f"daily_data/{datestr}"): | |
print("Path already exists :(") | |
return | |
os.mkdir(f"daily_data/{datestr}") | |
writeLog(f"Good morning Ponyville - {datestr}") | |
writeLog(f"Getting video IDs...") | |
videoids = getVideoIds() | |
writeLog(f"Got {len(videoids)} IDs!") | |
if len(videoids) > 450000 * len(youtube_api_keys): | |
print(f"Warning! Over {450000 * len(youtube_api_keys)} videos, we may run out of API requests.") | |
print("Press any key to continue anyways...") | |
input() | |
writeLog(f"Sorting...") | |
videoids = sorted(videoids) | |
writeLog(f"Downloading...") | |
downloadMetadata(videoids) | |
writeLog(f"Done for today!") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment