Created
March 15, 2025 18:48
-
-
Save yottatsa/cd2d1390335faa94ffe4987676b79ef4 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import logging | |
import argparse | |
import os | |
from typing import Callable, Collection, Iterable, Optional, Set, Tuple, NamedTuple | |
import glob | |
from functools import partial, lru_cache | |
import subprocess | |
import shlex | |
from itertools import chain as ichain, compress, tee | |
import unidecode | |
import datetime | |
import operator | |
from fn import F, _, Stream | |
from fn.op import call | |
logger: logging.Logger = logging.getLogger(__name__) | |
RSYNC = ["rsync", "-aP", "--whole-file", "--update", "--one-file-system", "--inplace"] | |
memoize = lru_cache(maxsize=None) | |
def streamdebug(message="Processed %s"): | |
def debug(item): | |
logger.debug(message, item) | |
return item | |
return debug | |
# def depth_filter(depth: int) -> Callable: | |
# return lambda dirpath: dirpath.count("/") == depth | |
def trim_root(root: str, path: str) -> str: | |
return lambda dirpath: dirpath.count("/") == depth | |
depth_filter = lambda depth: F(operator.eq, depth) << operator.methodcaller( | |
"count", "/" | |
) | |
trim_root = ( | |
lambda root: F() | |
<< operator.methodcaller("lstrip", "/") | |
<< operator.itemgetter(1) | |
<< operator.methodcaller("split", root, 1) | |
) | |
def pexec(args): | |
logger.debug("exec: %s", " ".join(map(shlex.quote, args))) | |
subprocess.check_call(args) | |
def walk(root: str, include_root: bool = False) -> Iterable[str]: | |
for croot, directories, files in os.walk(root): | |
if not include_root: | |
croot = croot.split(root, 1)[1].lstrip("/") | |
for directory in directories: | |
if directory.startswith("."): | |
continue | |
yield os.path.join(croot, directory) | |
for filename in files: | |
if filename.startswith("."): | |
continue | |
yield os.path.join(croot, filename) | |
class MediaItem(NamedTuple): | |
title: "TitleItem" | |
name: str | |
locations: Set[str] | |
def __repr__(self) -> str: | |
return f"<{self.__class__.__name__}: {self.title.name}/{self.name}({len(self.locations)})>" | |
def set_location(self, location): | |
self.locations.add(location) | |
return self | |
def __hash__(self): | |
return hash(self.title) + hash(self.name) | |
class TitleItem(NamedTuple): | |
library: "AnimeLib" | |
name: str | |
def __str__(self) -> str: | |
return self.name | |
def __repr__(self) -> str: | |
return f"<{self.__class__.__name__}: {self.name}>" | |
def media(self) -> Collection[MediaItem]: | |
return call( | |
F(filter, F(operator.eq, self) << operator.attrgetter("title")), | |
self.library.media(), | |
) | |
@memoize | |
def get_media(self, name: str) -> MediaItem: | |
return MediaItem(self, name, set()) | |
class MediaIndex(NamedTuple): | |
source_path: str | |
last_updated: datetime.datetime | |
library: "AnimeLib" | |
@memoize | |
def media(self) -> Collection[MediaItem]: | |
""" | |
Remote media. | |
""" | |
return call( | |
F(set) | |
<< F( | |
map, | |
F(streamdebug("Processed remote media: %s")) | |
<< ( | |
lambda i: self.library.get_title(i[1]) | |
.get_media(i[2]) | |
.set_location("/".join(i)) | |
) | |
<< operator.methodcaller("rsplit", sep="/", maxsplit=2), | |
) | |
<< F(filter, F() << depth_filter(1) << trim_root(self.source_path),), | |
self.contents, | |
) | |
@property | |
def contents(): | |
return Stream() << call( | |
F() | |
<< depth_filter(self.source_path.count("/") + 1) | |
<< F(filter, os.path.isdir), | |
walk(self.source_path, include_root=True), | |
) | |
class DiskIndex: | |
def __init__(self, library: "AnimeLib") -> None: | |
self.library = library | |
@property | |
def contents(self): | |
return Stream() << walk(self.library.library_path, include_root=True) | |
@memoize | |
def titles(self) -> Set[TitleItem]: | |
""" | |
On-disk titles. | |
""" | |
# FIXME how to run a few filters at once? tee + compress? | |
return call( | |
F(set) | |
<< F( | |
map, | |
F(streamdebug("Processed on-disk title: %s")) | |
<< self.library.get_title | |
<< trim_root(self.library.library_path), | |
) | |
<< F( | |
filter, | |
lambda a: all( | |
f(a) | |
for f in [ | |
F() << depth_filter(0) << trim_root(self.library.library_path), | |
F(os.path.isdir), | |
] | |
), | |
), | |
self.contents, | |
) | |
@memoize | |
def media(self) -> Collection[MediaItem]: | |
""" | |
On-disk media. | |
""" | |
# FIXME how to do update on mutable structure with a data copy?? | |
return call( | |
F(set) | |
<< F( | |
map, | |
F(streamdebug("Processed on-disk media: %s")) | |
<< ( | |
lambda i: self.library.get_title(i[1]) | |
.get_media(i[2]) | |
.set_location("/".join(i)) | |
) | |
<< operator.methodcaller("rsplit", sep="/", maxsplit=2), | |
) | |
<< F( | |
filter, F() << depth_filter(1) << trim_root(self.library.library_path), | |
), | |
self.contents, | |
) | |
class AnimeLib: | |
def __init__(self) -> None: | |
self.pending_path = "/srv/media/unsorted" | |
self.library_path = "/srv/media/anime" | |
self.backup_path = "/srv/backup/tape" | |
self.disk_index = DiskIndex(self) | |
@memoize | |
def get_title(self, name: str) -> TitleItem: | |
return TitleItem(self, name) | |
def titles(self) -> Collection[TitleItem]: | |
return self.disk_index.titles() | |
def media(self) -> Collection[MediaItem]: | |
return self.disk_index.media() | |
def pending(self) -> Collection[str]: | |
return filter(depth_filter(0), walk(self.pending_path)) | |
def collection(self) -> Set[str]: | |
return set(filter(depth_filter(1), walk(self.library_path))) | |
def backup(self) -> Set[str]: | |
return set(filter(depth_filter(1), walk(self.backup_path))) | |
def backup_item(self, item: str, dst: str) -> bool: | |
src = os.path.join(self.pending_path, item) | |
logger.info("%s -> %s", src, dst) | |
pexec(RSYNC + [src, os.path.dirname(dst)]) | |
return True | |
def backup_library_item(self, item: str, title: str, dst: str) -> bool: | |
src = os.path.join(self.library_path, title, item) | |
logger.info("%s -> %s", src, dst) | |
pexec(RSYNC + [src, os.path.dirname(dst)]) | |
return True | |
def archive_backup(self, src: str, title: str) -> bool: | |
item = os.path.basename(src) | |
title_dir = os.path.join(self.backup_path, title) | |
dst = os.path.join(self.backup_path, title, item) | |
logger.info("%s -> %s", src, dst) | |
if not os.path.exists(title_dir): | |
os.mkdir(title_dir) | |
if not os.path.exists(dst) and not os.path.islink(dst): | |
os.symlink(src, dst) | |
return True | |
def archive_item(self, item: str, title: str) -> bool: | |
src = os.path.join(self.pending_path, item) | |
dst = os.path.join(self.library_path, title, item) | |
logger.info("%s -> %s", src, dst) | |
os.rename(src, dst) | |
return True | |
def cleanup(string: str) -> str: | |
return unidecode.unidecode(string.replace("-", " ").replace("_", " ").replace(".", " ").lower()) | |
PendingItem = Tuple[str, str] | |
def search_pending( | |
pending: Collection[str], titles: Collection[str], backup_storage: Optional[str] | |
) -> Collection[PendingItem]: | |
title_matchers = list(map(lambda t: (t, cleanup(t)), titles)) | |
for item in pending: | |
matcher = cleanup(item) | |
logger.debug("matcher = %s", matcher) | |
for title, title_matcher in title_matchers: | |
if title_matcher == "orange": | |
continue | |
if title_matcher in matcher: | |
if backup_storage: | |
logger.info("Archive %s as %s and backup to %s", item, title, os.path.basename(backup_storage)) | |
else: | |
logger.info("Archive %s as %s", item, title) | |
yield (item, title) | |
def process_pending( | |
library: AnimeLib, backup_storage: Optional[str], a: PendingItem | |
) -> Collection[bool]: | |
item, title = a | |
if backup_storage: | |
backup = os.path.join(backup_storage, item) | |
yield library.backup_item(item, backup) | |
yield library.archive_backup(backup, title) | |
yield library.archive_item(item, title) | |
PendingBackup = Tuple[str, str] | |
def resolve_pending_backup(pending_backup: Collection[str], backup_storage: Optional[str]) -> Collection[PendingBackup]: | |
if backup_storage: | |
for path in pending_backup: | |
title = os.path.dirname(path) | |
item = os.path.basename(path) | |
if not os.path.exists(os.path.join(backup_storage, item)): | |
logger.info('Backup %s to %s', path, os.path.basename(backup_storage)) | |
yield (item, title) | |
def process_pending_backup(library: AnimeLib, backup_storage: Optional[str], a: PendingBackup) -> Collection[bool]: | |
item, title = a | |
if backup_storage: | |
backup = os.path.join(backup_storage, item) | |
yield library.backup_library_item(item, title, backup) | |
yield library.archive_backup(backup, title) | |
#def search_lost_backup( | |
# pending_backup: Collection[str], search_path: Collection[str] | |
#) -> Collection[PendingBackup]: | |
# """ | |
# 1. Accepts collections of missed backups and search paths | |
# 2. Globs search paths | |
# 3. Match items | |
# """ | |
# pending_hash = { | |
# os.path.basename(item): os.path.dirname(item) for item in pending_backup | |
# } | |
# candidates = [ | |
# (os.path.join(d, c), pending_hash.get(c)) | |
# for d in search_path | |
# for c in walk(d) | |
# if depth_filter(0)(c) and c in pending_hash | |
# ] | |
# if candidates: | |
# logger.info("Candidates: %s", candidates) | |
# elif pending_backup: | |
# logger.error("No candidates available.") | |
# | |
# return candidates | |
# | |
#def process_lost_backup(library: AnimeLib, a: PendingBackup) -> Collection[bool]: | |
# src, title = a | |
# yield library.archive_backup(src, title) | |
def write_index(library: AnimeLib, index_file: str = "index.md") -> None: | |
fn = os.path.join(library.library_path, index_file) | |
with open(fn, "w+") as f: | |
for title in library.titles(): | |
f.write(f"* {title}\n") | |
def main(debug: bool = False) -> None: | |
""" | |
Parse arguments, sort, and print output. | |
""" | |
parser = argparse.ArgumentParser(description=("archiving anime to tape")) | |
parser.add_argument("--debug", action="store_true", default=debug) | |
parser.add_argument( | |
"--clowntown", action="store_true", default=False, help="do stuff", | |
) | |
args = parser.parse_args() | |
if args.debug: | |
logging.basicConfig(level=logging.DEBUG) | |
else: | |
logging.basicConfig(level=logging.INFO) | |
library = AnimeLib() | |
logger.debug("Titles: %s", list(library.titles())) | |
for i in library.titles(): | |
logger.debug("%s: %s", i, list(i.media())) | |
write_index(library) | |
storage_path = glob.glob("/mnt/anime_A0000*") | |
logger.debug("storage_path = %s", storage_path) | |
active_storages = list(filter(os.path.ismount, storage_path)) | |
logger.debug("active_storages = %s", active_storages) | |
backup_storage = active_storages and active_storages[0] | |
pending_archival = list(library.pending()) | |
if pending_archival: | |
logger.warning("Pending %s", pending_archival) | |
pending_backup = library.collection() - library.backup() | |
if pending_backup: | |
logger.warning("Pending backup %s", pending_backup) | |
library_titles = map(str, library.titles()) | |
actions = ichain( | |
*map( | |
partial(process_pending, library, backup_storage), | |
search_pending(pending_archival, library_titles, backup_storage), | |
), | |
*map( | |
partial(process_pending_backup, library, backup_storage), | |
resolve_pending_backup(pending_backup, backup_storage), | |
), | |
#*map( | |
# partial(process_pending_backup, library), | |
# search_pending_backup(pending_backup, storage_path), | |
#), | |
) | |
if args.clowntown: | |
result = all(actions) | |
logger.debug("result = %s", result) | |
if not result: | |
raise SystemError | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment