Created
May 4, 2023 14:08
-
-
Save ChuckMac/788e837ff0c4b90e2fe46047d0713e05 to your computer and use it in GitHub Desktop.
Watch and auto-download new episodes of a YouTube channel and rename to Plex friendly format
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
This script can be used to attempt to download new videos from a | |
YouTube channel and rename them to a Plex friendly format. | |
ex: s01e01 - Video Title[ID].mp4 | |
s01e01 - Video Title.json | |
s01e01 - Video Title.webp | |
It will skip any videos that are shorter than the specified | |
minimum length (in seconds) to avoid downloading shorts. | |
It will also skip any videos that already exist in the local archive directory. | |
The script will keep track of the last downloaded episode and use that to | |
calculate the next episode number. | |
Note: the public channel feed only lists the last 15 videos | |
""" | |
import http.client | |
import json | |
import xml.etree.ElementTree as ET | |
import glob | |
import os | |
import yt_dlp | |
# User config values - change these to meet your needs | |
yt_channel_id = "UC6n8I1UDTKP1IWjQMg6_TwA" | |
local_archive = r"H:\Seagate\Media\YouTube\B1M" | |
local_tmp = r"H:\Seagate\Media\Downloads\youtube" | |
video_format = "bestvideo[vcodec!^=av01][ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]" | |
min_vid_length = 120 | |
default_start = "s01e01" | |
# Global variables | |
next_string = "" | |
existing_episodes = [] | |
def parse_local_yt_files(): | |
""" | |
Parse the local YouTube archive directory to find existing episode files. | |
If no existing episodes are found, set the next episode to the default start. | |
""" | |
print("Checking local archive - " + local_archive) | |
os.chdir(local_archive) | |
json_files = sorted(glob.glob("*.json")) | |
# Calculate next episode | |
if len(json_files) == 0: | |
print("No existing Episodes found, using default start: " + default_start) | |
global next_string | |
next_string = default_start | |
else: | |
last_string = json_files[-1].split(" ") | |
print("Last episode found: " + last_string[0]) | |
update_next_string(last_string[0]) | |
global existing_episodes | |
for file in json_files: | |
with open(file, encoding="utf8") as fp: | |
data = json.load(fp) | |
existing_episodes.append(data['id']) | |
def update_next_string(last_string): | |
""" | |
Calculate the next episode string based on the last episode in the archive. | |
Expected format: sXXeYY | |
Parameters: | |
last_string (str): The string representing the last episode in the archive. | |
""" | |
last_ep = last_string.split("e") | |
next_ep = int(last_ep[1]) + 1 | |
global next_string | |
next_string = last_ep[0] + "e" + str(next_ep) | |
def get_channel_feed(): | |
""" | |
Get the XML feed of the specified YouTube channel using HTTP GET request. | |
""" | |
print("Getting YouTube Feed...") | |
connection = http.client.HTTPSConnection("www.youtube.com") | |
try: | |
connection.request("GET", "/feeds/videos.xml?channel_id=" + yt_channel_id) | |
except Exception as e: | |
connection.close() | |
print(">> Request failed, Unable to get YouTube Feed: {}".format(e)) | |
raise SystemExit(e) | |
response = connection.getresponse() | |
if response.status != 200: | |
connection.close() | |
print(">> Request failed, Non-200 received getting YouTube Feed: {}".format(response.status)) | |
raise SystemExit(response.status) | |
data = response.read() | |
connection.close() | |
parse_feed(data) | |
def parse_feed(data): | |
""" | |
Parse the XML feed to extract video entries and download any new episodes that meet the length requirement. | |
Parameters: | |
data (str): The XML data string to be parsed. | |
""" | |
tree = ET.fromstring(data) | |
entries = tree.findall('{http://www.w3.org/2005/Atom}entry') | |
ydl_opts = { | |
'format': video_format, | |
'outtmpl': f'{local_tmp}/%(title)s[%(id)s].%(ext)s', | |
'quiet': True, | |
'addmetadata': True, | |
'writeinfojson': True, | |
'writethumbnail': True, | |
'ignoreerrors': True, | |
'external_downloader': 'aria2c' | |
} | |
for entry in entries: | |
id = entry.find('{http://www.w3.org/2005/Atom}id').text | |
id = id.split(':')[-1] | |
print("--------------------") | |
print("Checking entry: " + id) | |
if id not in existing_episodes: | |
video_url = "https://www.youtube.com/watch?v=" + id | |
with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
video_info = ydl.extract_info(video_url, download=False) | |
duration = video_info['duration'] | |
if duration > min_vid_length: # check its not a short | |
print("Downloading: " + id + " - " + video_info['title']) | |
with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
video_info = ydl.extract_info(video_url, download=True) | |
duration = video_info['duration'] | |
print("Downloading Complete") | |
# Rename the downloaded files | |
for file in os.listdir(local_tmp): | |
if file.startswith(video_info['title']): | |
print("Moving to: " + local_archive + "/" + next_string + " - " + file) | |
os.rename(f'{local_tmp}/{file}', f'{local_archive}/{next_string} - {file}') | |
update_next_string(next_string) | |
else: | |
print("Episode too short [min: " + str(min_vid_length) + "s / ep: " | |
+ str(duration) + "s], skipping: " | |
+ id + " - " + video_info['title']) | |
else: | |
print("Episode already exists, skipping: " + id) | |
print("--------------------") | |
print("-- STARTING --") | |
parse_local_yt_files() | |
get_channel_feed() | |
print("-- FINISHED --") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment