Last active
January 10, 2020 23:27
-
-
Save crides/a6a8f46ecf4b6434957b25a8b2fd0d21 to your computer and use it in GitHub Desktop.
A simple YouTube play queue reading from RSS feed from Youtube with sync features
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
import json, os, time | |
QUEUE_FILE = "youtube.json" | |
def dump_queue(Q): | |
json.dump(Q, open(QUEUE_FILE, "w"), indent=2) | |
def read_queue(): | |
return json.load(open(QUEUE_FILE)) | |
def get_sub_urls(): | |
from xml.etree import ElementTree | |
root = ElementTree.parse("subscription_manager.xml").getroot() | |
return [sub.get("xmlUrl") for sub in root[0][0]] | |
def get_vids_from_sub(url, from_time): | |
import feedparser, time | |
feed = feedparser.parse(url) | |
new_vids = [] | |
for entry in feed.entries: | |
unix_time = time.mktime(entry["published_parsed"]) | |
if unix_time > from_time: | |
new_vids.append({ | |
"channel": entry["author"], | |
"title": entry["title"], | |
"link": entry["link"], | |
"unix_time": unix_time, | |
}) | |
new_vids.sort(key=lambda v:v["unix_time"]) | |
return new_vids | |
def get_duration(url): | |
# Naive processing without BS | |
from urllib import request | |
body = request.urlopen(url).read().decode("utf8") | |
lines = body.split("\n") | |
for line in lines: | |
if 'itemprop="duration"' in line: | |
start = line.find("PT") + 2 | |
end = line.find("S") | |
time = line[start:end].split("M") | |
time = int(time[0]) * 60 + int(time[1]) | |
return time | |
def id_from_url(url): | |
id_ind = url.find("v=") + 2 | |
return url[id_ind:] | |
def download_thumbnail(url): | |
from urllib import request | |
import os | |
destdir = f"{os.environ['HOME']}/.config/youtube/" | |
id = id_from_url(url) | |
request.urlretrieve(f"https://i1.ytimg.com/vi/{id}/hqdefault.jpg", f"{destdir}{id}.jpg") | |
def renew_queue(): | |
Q = read_queue() | |
last_fetch = Q["fetch_time"] | |
new_fetch = time.time() + time.timezone | |
new_vids = [] | |
sub_urls = get_sub_urls() | |
for i, sub_url in enumerate(sub_urls): | |
print(f"Fetching subscribers {i+1}/{len(sub_urls)}", end="\r") | |
new_vids.extend(get_vids_from_sub(sub_url, last_fetch)) | |
print() | |
Q["fetch_time"] = new_fetch | |
Q["videos"] += new_vids | |
print(f"Added {len(new_vids)} videos to queue") | |
dump_queue(Q) | |
for i, video in enumerate(new_vids): | |
print(f"Downloading thumbnails {i+1}/{len(new_vids)}", end="\r") | |
link = video["link"] | |
video["duration"] = get_duration(link) | |
dump_queue(Q) | |
download_thumbnail(link) | |
print() | |
def push_queue(): | |
from github import Github, InputFileContent | |
from getpass import getpass | |
queue_content = open(QUEUE_FILE).read() | |
Q = json.loads(queue_content) | |
username = input("Username: ") | |
passwd = getpass("Password: ") | |
g = Github(username, passwd) | |
user = g.get_user() | |
try: | |
if "gist_id" in Q: | |
gist_id = Q["gist_id"] | |
gist = g.get_gist(gist_id) | |
gist_files = {QUEUE_FILE: InputFileContent(queue_content, QUEUE_FILE)} | |
gist.edit("Youtube Queue", gist_files) | |
else: | |
gist_files = {QUEUE_FILE: InputFileContent("Nothing", QUEUE_FILE)} | |
gist = user.create_gist(False, gist_files, "Youtube Queue") | |
print(f"Created new gist: {gist_id}") | |
Q["gist_id"] = gist.id | |
queue_content = json.dumps(Q, indent=2) | |
dump_queue(Q) | |
gist_files = {QUEUE_FILE: InputFileContent(queue_content, QUEUE_FILE)} | |
gist.edit("Youtube Queue", gist_files) | |
except Exception as e: | |
print("Github error:", e) | |
def pull_queue(): | |
from getpass import getpass | |
from github import Github | |
Q = read_queue() | |
username = input("Username: ") | |
passwd = getpass("Password: ") | |
g = Github(username, passwd) | |
user = g.get_user() | |
try: | |
if "gist_id" in Q: | |
gist_id = Q["gist_id"] | |
gist = g.get_gist(gist_id) | |
open(QUEUE_FILE, "w").write(gist.files[QUEUE_FILE].content) | |
else: | |
print("No Gist ID") | |
except Exception as e: | |
print("Github error:", e) | |
def list_videos(): | |
print("{:20}{:80}{}".format("Channel", "Title", "Time")) | |
for vid in json.load(open(QUEUE_FILE))["videos"]: | |
print("{:20}{:80}{}".format(vid["channel"], vid["title"], time.ctime(vid["unix_time"]))) | |
def play_queue(resume=False): | |
import textwrap, itertools | |
from subprocess import Popen, PIPE | |
if resume: | |
Q = read_queue() | |
if "last_played" in Q: | |
Popen("mpv " + Q["last_played"], shell=True) | |
while True: | |
Q = read_queue() | |
videos = Q["videos"] | |
if len(videos) == 0: | |
print("No videos in queue.") | |
break | |
channel_width = max(map(lambda v:len(v["channel"]), videos)) + 2 | |
fzf_lines = [f"{'Channel':{channel_width}}Title"] | |
fzf_lines.extend( \ | |
[f"{v['channel']:{channel_width}}{v['title']}\xa0{v['link']}" \ | |
for v in videos]) | |
fzf_input = "\n".join(fzf_lines) | |
fzf_binds = [ | |
("ctrl-f", "page-down"), | |
("ctrl-b", "page-up"), | |
] | |
fzf_opts = [ | |
"-e", # --exact | |
# "--no-clear", | |
"-m", # --multi | |
"--reverse", | |
"--header-lines=1", | |
"-d \xa0", # --delimiter | |
"--with-nth=1", | |
"--expect=del,enter", | |
] | |
fzf_opts.append("--bind=" + ",".join(f"{b[0]}:{b[1]}" for b in fzf_binds)) | |
fzf_cmd = f"fzf {' '.join(fzf_opts)}" | |
fzf = Popen(fzf_cmd, stdin=PIPE, stdout=PIPE, shell=True) | |
fzf.stdin.write(bytes(fzf_input, "utf8")) | |
fzf.stdin.close() | |
results = fzf.stdout.readlines() | |
if len(results) == 0: | |
break | |
key = results.pop(0)[:-1].decode("utf8") | |
for i, line in enumerate(results): | |
results[i] = line[:-1].split(b"\xa0")[1].decode("utf8") | |
if key == "del": | |
Q["videos"] = list(filter(lambda v:v["link"] not in results, videos)) | |
dump_queue(Q) | |
elif key == "enter": | |
# os.system("tput rmcup") | |
mpv_out = os.popen("mpv --quiet --msg-level=all=no,cplayer=info " + " ".join(results)) | |
while True: | |
line = mpv_out.readline() | |
if line == "\n": | |
break | |
if line.startswith("Playing: "): | |
link = line.split(": ")[1][:-1] | |
Q["videos"] = list(filter(lambda v:v["link"] != link, videos)) | |
Q["last_played"] = link | |
dump_queue(Q) | |
# os.system("tput rmcup") | |
def usage(): | |
print(f"Usage: {sys.argv[0]} play | resume | fetch | list | push | pull") | |
exit(1) | |
if __name__ == "__main__": | |
import sys | |
if len(sys.argv) != 2: | |
usage() | |
if sys.argv[1] == "play": | |
play_queue() | |
elif sys.argv[1] == "resume": | |
play_queue(True) | |
elif sys.argv[1] == "fetch": | |
renew_queue() | |
elif sys.argv[1] == "list": | |
list_videos() | |
elif sys.argv[1] == "push": | |
push_queue() | |
elif sys.argv[1] == "pull": | |
pull_queue() | |
else: | |
usage() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment