Last active
February 3, 2020 00:02
-
-
Save tnymlr/ce43bfe607e46c9c04e47ab5a0e631b3 to your computer and use it in GitHub Desktop.
youman
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import os | |
import shutil | |
import subprocess | |
import random | |
from argparse import ArgumentParser | |
from argparse import REMAINDER | |
from pathlib import Path | |
from string import ascii_lowercase as letters | |
from typing import List | |
from typing import Dict | |
from xdg import XDG_CACHE_HOME | |
from fasteners import InterProcessLock | |
YOUMAN_DIR = XDG_CACHE_HOME / Path('youman') | |
WATCH_LIST_PATH = YOUMAN_DIR / Path('list') | |
DOWNLOAD_DIR = YOUMAN_DIR / Path('downloads') | |
SEEN_DIR = YOUMAN_DIR / Path('seen') | |
FILE_NAME_TPL = '%(uploader)s - %(title)s.%(ext)s' | |
MENU_FONT = 'NotoSans Nerd Font SemiCondensed Medium 13' | |
MENU_LINES = 24 | |
def notify(msg): | |
subprocess.run([ | |
'notify-send', | |
'-t', | |
'6000', | |
'YouTube:', | |
msg | |
]) | |
def on_progress(d): | |
name = Path(d['filename']).stem | |
if d['status'] == 'finished': | |
notify('Downloaded {}'.format(name)) | |
YDL_OPTS = {'format': 'bestvideo[height<=1080]+bestaudio/best[height<=1080]', | |
'outtmpl': '{}/{}'.format(DOWNLOAD_DIR.as_posix(), FILE_NAME_TPL), | |
'progress_hooks': [on_progress], | |
'prefer_ffmpeg': True} | |
class DownloadGuard: | |
def __init__(self, acquire=False): | |
self.__lock = InterProcessLock(YOUMAN_DIR / 'lock') | |
self.__acquired = False | |
self.__acquire = acquire | |
def __enter__(self): | |
if self.__acquire: | |
self.lock() | |
return self | |
def __exit__(self, type, value, traceback): | |
self.release() | |
def lock(self): | |
self.__lock.acquire(blocking=True) | |
self.__acquired = True | |
def try_lock(self) -> bool: | |
self.__acquired = self.__lock.acquire(blocking=False) | |
return self.__acquired | |
def release(self): | |
if self.__acquired: | |
self.__lock.release() | |
self.__acquired = False | |
def ensure_dirs(): | |
DOWNLOAD_DIR.mkdir(exist_ok = True, parents = True) | |
SEEN_DIR.mkdir(exist_ok = True, parents = True) | |
def get_args(actions: List): | |
parser = ArgumentParser(prog='youman') | |
parser.add_argument('action', action='store', nargs='?', | |
help='action to perform', default='echo', | |
choices=actions) | |
parser.add_argument('urls', nargs=REMAINDER) | |
return parser.parse_args() | |
def do_echo(args): | |
print('Directory: {}'.format(YOUMAN_DIR)) | |
print('List file: {}'.format(WATCH_LIST_PATH)) | |
print('Download dir: {}'.format(DOWNLOAD_DIR)) | |
print('URLs: {}'.format(args.urls)) | |
def do_add(args): | |
# validators library does something to stdout | |
# that bemenu won't show up after call | |
# to the script, so I moved import here | |
# to avoid affecting 'watch' operations | |
from validators import url as valid | |
with DownloadGuard(acquire=True): | |
with open(WATCH_LIST_PATH.as_posix(), 'a') as list: | |
for url in args.urls: | |
if 'youtube' in url: | |
if valid(url): | |
if 'watch?v=' in url: | |
list.write(url + '\n') | |
notify("Added {}".format(url)) | |
else: | |
msg = ("URL [{}] does not appear to be a YouTube" + | |
" video link, skipping").format(url) | |
notify(msg) | |
else: | |
notify('URL [{}] is mailformed, skipping'.format(url)) | |
else: | |
msg = ("URL [{}] doesn't not seem " + | |
"to belong to YouTube, skipping").format(url) | |
notify(msg) | |
def do_list(args): | |
if WATCH_LIST_PATH.exists(): | |
with open(WATCH_LIST_PATH.as_posix(), 'r') as list: | |
for url in list.readlines(): | |
print(url, end = '') | |
def do_clean(args): | |
shutil.rmtree(SEEN_DIR) | |
def do_watch(args): | |
names: Dict = {} | |
videos: List = [] | |
for r, d, f in os.walk(DOWNLOAD_DIR.as_posix()): | |
for file in f: | |
file = Path(file) | |
if file.suffix == '.part': | |
continue | |
else: | |
names[file.stem] = file.name | |
videos.append(file.stem) | |
cmd = "echo -e '{}' | bemenu -l {} -i -p YouTube: --fn '{}'".format( | |
'\n'.join(videos).replace("'", "'\\''"), | |
MENU_LINES, | |
MENU_FONT) | |
result = subprocess.run(['/bin/sh', '-c', cmd], stdout=subprocess.PIPE) | |
choice = result.stdout.decode('utf-8') | |
if choice: | |
choice = choice.replace('\n', '') | |
old_path = DOWNLOAD_DIR / names[choice] | |
result = subprocess.run(['mpv', old_path]) | |
if result.returncode == 0: | |
new_path = SEEN_DIR / names[choice] | |
shutil.move(str(old_path), str(new_path)) | |
notify("Moved {} to the 'seen' folder".format(choice)) | |
else: | |
notify("Failed to move {} to the 'seen' folder".format(choice)) | |
def get_download_list() -> Path: | |
lists = [] | |
for r, d, f in os.walk(YOUMAN_DIR): | |
for file in f: | |
file = Path(file) | |
if file.stem == WATCH_LIST_PATH.stem: | |
suffix = file.suffix | |
if (suffix and suffix.startswith('.')): | |
lists.append(YOUMAN_DIR / file) | |
if len(lists) > 1: | |
# can't have more than one active download session | |
raise RuntimeError('Unexpected amount of active download lists') | |
elif len(lists) > 0: | |
return lists[0] | |
else: | |
return None | |
def create_download_list() -> Path: | |
if WATCH_LIST_PATH.exists(): | |
id = ''.join(random.choice(letters) for i in range(10)) | |
name = '{}.{}'.format(WATCH_LIST_PATH.stem, id) | |
path = YOUMAN_DIR / name | |
with DownloadGuard() as guard: | |
if guard.try_lock(): | |
shutil.move(WATCH_LIST_PATH, path) | |
return path | |
else: | |
print('Unable to acquire lock, skipping...') | |
return None | |
else: | |
print('Nothing to download') | |
return None | |
def clean_download_list(list: Path): | |
if list.exists(): | |
list.unlink() | |
def do_download_list(list: Path): | |
# this import is quite slow and we don't need it | |
# in other operations, so I keep it here to make | |
# other operations quicker | |
from youtube_dl import YoutubeDL | |
urls = [] | |
with open(list, 'r') as file: | |
for line in file.readlines(): | |
urls.append(line) | |
with YoutubeDL(YDL_OPTS) as ydl: | |
notify('Download has started') | |
ydl.download(urls) | |
def do_download(args): | |
list = get_download_list() | |
if not list: | |
list = create_download_list() | |
if list: | |
do_download_list(list) | |
clean_download_list(list) | |
def main(): | |
actions = {"echo": do_echo, | |
"add": do_add, | |
"list": do_list, | |
"clean": do_clean, | |
"watch": do_watch, | |
"download": do_download} | |
ensure_dirs() | |
args = get_args(list(actions.keys())) | |
action = actions[args.action] | |
action(args) | |
if __name__ == "__main__": | |
main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[Unit] | |
Description=Downloads YouTube Videos | |
ConditionACPower=true | |
[Service] | |
WorkingDirectory=/home/CHANGE_THAT_TO_YOUR_USERNAME | |
ExecStart=/usr/bin/python /home/CHANGE_THAT_TO_YOUR_USERNAME/CHANGE_TO_PATH_TO/youman.py download |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[Unit] | |
Description=Try to download new youtube videos every minute | |
[Timer] | |
Unit=youman.service | |
OnCalendar=*-*-* *:*:00 | |
[Install] | |
WantedBy=timers.target |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
pip install --user fasteners xdg youtube-dl
and it should run. Or not, I don't remember precise dependencies. Works on my machine (tm)